首页 理论教育 深入了解RDD的计算过程

深入了解RDD的计算过程

时间:2023-06-29 理论教育 版权反馈
【摘要】:RDD在进行计算前,Driver给Executor发送消息,让Executor启动Task,在Executor成功启动Task后,通过消息机制汇报启动成功信息给Driver。图2-6 RDD的计算过程具体操作如下。通过调用RDD的iterator方法进行计算,而iterator方法中进行的最终运算的方法是compute()。此时选择查看MapPartitionsRDD已经实现的compute方法,可以发现compute方法是通过f方法实现的,而f方法就是在创建MapPartitionsRDD时输入的操作函数。6)上述计算完成后,对数据进行传输。

深入了解RDD的计算过程

Spark中的Job是由具体的Task构成的,基于Spark程序内部的调度模式,根据宽依赖的关系划分不同的Stage,最后一个Stage依赖倒数第二个Stage等,从最后一个Stage获取结果;在Stage内部有一系列的任务,这些任务被提交到集群上的计算结点进行计算,计算结点执行计算逻辑时,复用位于Executor中的线程池中的线程,线程中运行的任务是调用具体Task的run方法进行计算,此时,如果调用具体Task的run方法,需要考虑不同Stage内部具体Task的类型,Spark规定最后一个Stage中的Task的类型称为resultTask,因为需要获取最后的结果,前面所有Stage的Task都是shuffleMapTask。

RDD在进行计算前,Driver给Executor发送消息,让Executor启动Task,在Executor成功启动Task后,通过消息机制汇报启动成功信息给Driver。如下图2-6所示。

978-7-111-55442-4-Chapter02-16.jpg

图2-6 RDD的计算过程

具体操作如下。

1)Driver中的CoarseGrainedSchedulerBackend给CoarseGrainedExecutorBackend发送LaunchTask消息。

2)首先反序列化TaskDescription(下述第11行)。

978-7-111-55442-4-Chapter02-17.jpg

978-7-111-55442-4-Chapter02-18.jpg

3)Executor会通过LaunchTask来执行Task(上述第15行)。

4)Executor的launchTask方法中通过taskRunner对象在threadPool运行具体的Task(下述代码第9行)。

978-7-111-55442-4-Chapter02-19.jpg

在taskRunner的run方法中首先会通过statusUpdate给Driver发信息汇报自己的状态,说明自己是running状态(下述代码第9行,第22行)。

同时TaskRunner内部会做一些准备工作,例如反序列化Task的依赖(下述代码第13行),通过网络获取需要的文件Jar等(下述代码第13行)。

之后反序列化Task本身(下述代码第15行)。

978-7-111-55442-4-Chapter02-20.jpg

978-7-111-55442-4-Chapter02-21.jpg

5)调用反序列化后的Task.run方法来执行任务,并获得执行结果(下述代码第5行)。

978-7-111-55442-4-Chapter02-22.jpg

978-7-111-55442-4-Chapter02-23.jpg(www.xing528.com)

通过查看task.run方法的源代码可以发现,run方法调用了runTask的方法,而runTask方法是一个抽象方法,runTask方法内部会调用RDD的iterator()方法,该方法就是针对当前Task所对应的Partition进行计算的关键所在,在Executor的内部会迭代Partition的元素并交给用户自定义的Function进行处理。

978-7-111-55442-4-Chapter02-24.jpg

Task有两个子类,分别是ShuffleMapTask和ResultTask,接下来分别对两者进行讲解:

①ShuffleMapTask。

978-7-111-55442-4-Chapter02-25.jpg

978-7-111-55442-4-Chapter02-26.jpg

首先,ShuffleMapTask会反序列化RDD及其依赖关系(上述代码第7行)。

(上述代码第20行)通过调用RDD的iterator方法进行计算,而iterator方法中进行的最终运算的方法是compute()。

978-7-111-55442-4-Chapter02-27.jpg

978-7-111-55442-4-Chapter02-28.jpg

而RDD的compute方法是一个抽象方法,每个RDD都需要重写上次方法。

此时选择查看MapPartitionsRDD已经实现的compute方法,可以发现compute方法是通过f方法实现的,而f方法就是在创建MapPartitionsRDD时输入的操作函数。

注意:通过迭代器的不断叠加,将每个RDD的小函数合并成一个大的函数流。

然后在计算具体的Partition之后,会通过shuffleManager获得的shuffleWriter把当前Task计算的结果根据具体的shuffleManager实现写入到具体的文件,操作完成之后会把MapStatus发送给Driver端的DAGScheduler的MapOutputTracker。

②ResultTask。Driver端的DAGSchueduler的MapOutputTracker把shuffleMapTask执行的结果交给ResultTask,ResultTask根据前面Stage的执行结果进行Shuffle后产生整个Job最后的结果。

978-7-111-55442-4-Chapter02-29.jpg

978-7-111-55442-4-Chapter02-30.jpg

ResultTask的runTask方法中(第12行)反序列化生成func函数,最后通过func函数计算出最终结果。

6)上述计算完成后,对数据进行传输。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈