Spark“并行”写


spark本来已经是一个分布式的计算平台,按说不应该手工去处理并行/异步的事情。但是,最近我实现的一个spark任务,需要一次写出数十个分区的数据,虽然这些分区的数据之间完全独立,但坑爹的是,基础数据平台提供的写数据接口只支持同步的一次写一个分区的数据。这样造成的结果就是,用循环来实现时,虽然我有很多个计算节点,数据(RDD)也分布于各个节点之上,但是我只能等一个分区写完成后,再写下一个分区:因为“写分区”这个任务的下发是同步阻塞的。

1
2
partitions
.map(part => writeToDisk(data.filter(part), part))

引入Future


这里要感谢scala提供的Future方案。它可以方便的将同步的阻塞操作包装成异步操作并行下发。

配合Await.ready操作来等待所有future完成,我们可以将上面的代码改写为:

1
2
3
partitions
.map(part => Future { writeToDisk(data.filter(data.part == part), part) })
.map(f => Await.ready(f, Duration.Inf))


避免data重复计算


在spark中,我们知道使用cache/persist可以避免数据流的重复计算。在这里也是一样,Future之前需要将data用cache/persist打个点。

但是这样还!不!够!

在这里我们希望发生的事情是data在future之前先计算好(只计算一次),然后异步的分发下去写对应的分区。

但是由于spark的惰性计算特性,使用Future之后,多个job并行下发,每个job在执行时data都还没有计算出来,也就没有cache的数据。反应到spark ui上的jobs页面的情况就是,看上去多个job并行执行了,但是cache操作并没有带来tasks skipped。

这时,我们需要在future之前,强制把data计算出来并cache住。这里其实只需要调用一些不影响数据的action算子即可,例如data.count()。

最终的结果,在使用上面的改进措施之后,我的这个spark任务执行时间缩短了约60%。


推荐阅读:
使用双buffer无锁化
不要拷贝
一个新朋友 Git Hooks

转载请注明出处: http://blog.guoyb.com/2018/04/21/spark-scala-future/

欢迎使用微信扫描下方二维码,关注我的微信公众号TechTalking,技术·生活·思考:
后端技术小黑屋

Comments