首页 理论教育 如何优化Shuffle操作?

如何优化Shuffle操作?

时间:2023-06-29 理论教育 版权反馈
【摘要】:在实践中发现,对于针对超大数据量的Shuffle过程,调节该参数可以大幅度提升稳定性。·spark.shuffle.manager:该参数用于设置ShuffleManager的类型。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2及之后的版本默认都是SortShuffleManager。但是这种方式下依然会产生大量的磁盘文件,因此Shuffle Write性能有待提高。·spark.shuffle.consolidateFiles:如果使用HashShuffleManager,该参数有效。·spark.local.dir:Shuffle的Write阶段使用的本地磁盘目录。

如何优化Shuffle操作?

在Spark程序中,Shuffle是性能的最大瓶颈,因为Shuffle的过程往往伴随着磁盘I/O与网络I/O等开销,程序编写的一个准则就是“尽量避免使用需要Shuffle的算子,且在必须Shuffle时尽量减少Shuffle的数据量”,而在Shuffle不可避免时,也要尽量优化Shuffle的方方面面以调优性能。

这里针对Shffule过程中的一些主要参数,详细讲解各个参数的功能、默认值,以及基于实践经验给出的调优建议。更详细的关于Shuffle的解析,请参阅本书第7章Shuffle机制。

·spark.shuffle.file.buffer:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入Buffer缓冲中,待缓冲写满之后,才会溢写到磁盘,默认值为32 KB。调优建议:如果作业可用的内存资源较为充足,可以适当增加这个参数的大小(比如128 KB),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘I/O次数,进而提升性能。

·spark.reducer.maxSizeInFlight:该参数用于设置Shuffle Read Task的Buffer缓冲大小,而这个Buffer缓冲决定了每次能够拉取多少数据,默认值为48 MB。调优建议:如果作业可用的内存资源较为充足,可以适当增加这个参数的大小(如96 MB),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。

·spark.shuffle.io.maxRetries:shuffle read task从shuffle write task所在结点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。默认值为3。调优建议:对于那些包含了特别耗时的Shuffle操作的作业,建议增加重试最大次数(如30次),以避免由于JVM的Full GC或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的Shuffle过程,调节该参数可以大幅度提升稳定性。

·spark.shuffle.io.retryWait:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认值是5 s。调优建议:建议加大间隔时长(如30 s),以增加Shuffle操作的稳定性。(www.xing528.com)

·spark.shuffle.memoryFraction与spark.storage.memoryFraction(StaticMemoryManager模式下),或spark.memory.fraction与spark.memory.storageFraction(UnifiedMemoryMan-ager模式下):详情参考9.3.3资源参数调优一节。

·spark.shuffle.manager:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有3个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2及之后的版本默认都是SortShuffleManager。Tungsten-Sort与Sort类似,但是使用了Tungsten计划中的堆外内存管理机制,内存使用效率更高。调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果用户的业务逻辑中需要该排序机制,则使用默认的SortShuffleManager即可;而如果用户的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里需要注意的是,随着Tungsten-Sort越来越稳定,可以尝试使用Tungsten-Sort。

·spark.shuffle.sort.bypassMergeThreshold:当ShuffleManager为SortShuffleManager时,如果Shuffle Read Task的数量小于这个阈值,则Shuffle Write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个Task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件,默认值是200。调优建议:当使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于Shuffle Read Task的数量。那么此时就会自动启用bypass机制,此时Map端就不会进行排序了,从而减少了排序的性能开销。但是这种方式下依然会产生大量的磁盘文件,因此Shuffle Write性能有待提高。

·spark.shuffle.consolidateFiles:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启Consolidate机制,会大幅度合并Shuffle Write的输出文件,在Shuffle Read Task数量特别多的情况下,这种方法可以极大地减少磁盘I/O开销,提升性能。默认值是False。调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制外,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。

·spark.local.dir:Shuffle的Write阶段使用的本地磁盘目录。当Shuffle阶段磁盘I/O时间过长时,可以将该参数设置为多个I/O速度快的磁盘,通过增加I/O来优化Shuffle性能。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈