首页 理论教育 Shuffle数据读写的源代码解析

Shuffle数据读写的源代码解析

时间:2023-06-29 理论教育 版权反馈
【摘要】:首先查看ShuffleMapTask中的数据写流程,具体代码如下。注意,是宽依赖的RDD,而非ShuffleRDD。

Shuffle数据读写的源代码解析

1.Shuffle写数据的源代码解析

从Spark Shuffle的整体框架中可以看到,在ShuffleManager中提供了Shuffle相关数据块的写入与读取,即对应的接口getWriter与getReader。

在解析Shuffle框架数据读取过程中,可以构建一个具有ShuffleDependency的RDD,查看在执行过程中,Shuffle框架中的数据读写接口getWriter与getReader如何使用,通过这种具体案例的方式来加深对源代码的理解。

Spark中Shuffle具体的执行机制可以参考本书的其他章节,在此仅分析与Shuffle直接相关的内容。通过DAG调度机制的解析可以知道,Spark中的一个作业可以根据宽依赖切分Stage,而在Stage中,相应的Task也包含两种,即ResultTask与ShuffleMapTask。其中,一个ShuffleMapTask会基于ShuffleDependency中指定的分区器将一个RDD的元素拆分到多个bucket中,此时通过ShuffleManager的getWriter接口来获取数据与bucket的映射关系。而ResultTask对应的是一个将输出返回给应用程序Driver端的Task,在该Task执行过程中,最终都会调用RDD的compute对内部数据进行计算,而在带有ShuffleDependency的RDD中,在compute计算时,会通过ShuffleManager的getReader接口获取上一个Stage的Shuffle输出结果来作为本次Task的输入数据。

首先查看ShuffleMapTask中的数据写流程,具体代码如下。

2.Shuffle读数据的源代码解析

对应的数据读取器,从RDD的5个抽象接口可知,RDD的数据流最终会经过算子操作,即RDD中的compute方法。下面以包含宽依赖的RDD、CoGroupedRDD为例,查看如何获取Shuffle的数据。具体代码如下。

从代码中可以看到,宽依赖的RDD的compute操作中,最终是通过SparkEnv中的Shuf-fleManager实例的getReader方法,获取数据的读取器的,然后再次调用读取器的read方法读取指定分区范围的Shuffle数据。

注意,是宽依赖的RDD,而非ShuffleRDD。除了ShuffleRDD之外,还有其他RDD也可以是宽依赖,例如前面给出的CoGroupedRDD。(www.xing528.com)

目前支持的几种具体Shuffle实现机制在读取数据的处理上都是一样的,从源代码角度可以看到,当前继承了ShuffleReader这一数据读取器的接口的具体子类只有BlockStoreShuf-fleReader,因此本章内容仅在此对各种Shuffle实现机制的数据读取进行解析,后续各实现机制中不再重复描述。

源代码解析的第一步是查看该类的描述信息,具体如下。

从注释上可以看出,读取器负责上一Stage为下一Stage输出数据块的读取。从前面对ShuffleReader接口的解析可知,继承的具体子类需要实现真正的数据读取操作,即实现read方法。因此该方法是需要重点关注的源代码,一些关键的代码如下。

下面进一步解析数据读取的部分细节,首先是数据块获取,读取ShuffleBlockFetcherIter-ator类,在类的构造体中调用了initialize方法(构造体中的表达式会在构造实例时执行),该方法中会根据数据块所在位置(本地结点或远程结点)分别进行读取,其中的关键代码如下。

与Hadoop一样,Spark计算框架也是基于数据本地性,即移动计算而非移动数据的原则,因此在获取数据块时,也会考虑数据本地性,尽量从本地读取已有的数据块,然后再远程读取。

另外,数据块的本地性是通过ShuffleBlockFetcherIterator实例构建时所传入的位置信息来判断的,而该信息由MapOutputTracker实例的getMapSizesByExecutorId方法提供。可以参考该方法的返回值类型查看相关的位置信息,返回值类型为:Seq[(BlockManagerId,Seq[(BlockId,Long)])],其中BlockManagerId是BlockManager的唯一标识信息,BlockId是数据块的唯一信息,对应的Seq[(BlockId,Long)])表示一组数据块标识ID及其数据块大小的元组信息。

最后简单分析一下如何设置分区内部的排序标识,当需要对分区内的数据进行排序时,会设置RDD中的宽依赖(ShuffleDependency)实例的keyOrdering变量。下面以基于排序的OrderedRDDFunctions提供的sortByKey方法为例给出解析,具体代码如下。

当需要对分区内部的数据进行排序时,构建RDD的同时会设置Key值的排序算法,结合前面的read方法中的第52行代码,当指定Key值的排序算法时,就会使用外部排序器对分区内的数据进行排序。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈