首页 理论教育 DAGScheduler划分Stage的具体算法

DAGScheduler划分Stage的具体算法

时间:2023-06-29 理论教育 版权反馈
【摘要】:理解了前面DAGScheduler划分Stage的原理后,现在结合Spark源代码来看看Spark是如何实现DAGScheduler划分Stage算法的。DAG-SchedulerEventProcessLoop在doOnReceive方法中反过来调用了DAGScheduler中实现划分Stage算法很关键的handleJobSubmitted方法。DAGScheduler的handle-JobSubmitted方法中最后调用submitStage方法,根据RDD的依赖关系,递归提交所有的Stage。DAGScheduler的getParentStagesAndId方法的代码如下。可以说,DAGScheduler的getParentStages方法是划分Stage的核心实现。至此,整个DAGScheduler划分Stage的过程已经介绍完毕。Stage划分完成后,DAG-Scheduler就会回到HandleJobSubmitted方法中调用submitStage方法。

DAGScheduler划分Stage的具体算法

理解了前面DAGScheduler划分Stage的原理后,现在结合Spark源代码来看看Spark是如何实现DAGScheduler划分Stage算法的。

RDD的Action操作算子如count会触发一个Spark Job,其实是RDD的count方法调用了SparkContext的runJob方法。然后,在SparkContext的runJob方法中调用3次重载的runJob方法。最后被调用的重载runJob方法中调用了DAGScheduler的runJob方法,从而进入DAG-Scheduler的源代码。

在DAGScheduler的源代码中,DAGScheduler的runJob方法中调用submitJob方法。sub-mitJob方法中最重要的是创建一个JobWaiter对象,以及创建JobSubmitted事件对象把Job-Waiter对象发送给DAGScheduler的内嵌类DAGSchedulerEventProcessLoop对象实例。DAG-SchedulerEventProcessLoop在doOnReceive方法中反过来调用了DAGScheduler中实现划分Stage算法很关键的handleJobSubmitted方法。

DAGScheduler的handleJobSubmitted方法中调用newResultStage方法,newResultStage方法根据finalRDD创建finalStage,这时便真正开始了Stage的划分。DAGScheduler的handle-JobSubmitted方法中最后调用submitStage方法,根据RDD的依赖关系,递归提交所有的Stage。

DAGScheduler的handleJobSubmitted方法关键的代码如下。

DAGScheduler的newResultStage方法中调用getParentStagesAndId方法得到ResultStage所有依赖的Parent Stage列表,以及ResultStage的Stage ID。

DAGScheduler的newResultStage方法的代码如下。

DAGScheduler的getParentStagesAndId方法比较简单,主要是调用getParentStages方法,其他就是得到一个当前最大的Stage ID。

DAGScheduler的getParentStagesAndId方法的代码如下。(www.xing528.com)

可以说,DAGScheduler的getParentStages方法是划分Stage的核心实现。这个方法的输出是获取或创建给定的RDD在Job中的所有Parent Stage列表。在该方法中,Stage的划分由最后一个Result Stage开始,从后往前回溯边划分边创建。给定的RDD会先被压入Stack,以便被逐个访问。对每个被访问的RDD,判断其Dependency是否是ShuffleDepen-dency宽依赖:如果是,就调用getShuffleMapStage方法获取或创建Stage,并放到一个临时HashSet中,便于统一最后转为列表返回;如果不是,就把Dependency的RDD也压入Stack中,继续判断该RDD的Dependency。方法的结束标志是Stack中所有压入的临时RDD都被访问过。

DAGScheduler的getParentStages方法的代码如下。

剩下的过程就比较容易理解了,DAGScheduler的getShuffleMapStage方法先根据给定的ShuffleDependency的shuffleId在shuffleToMapStage这个HaskMap中获取相应的ShuffleMap-Stage,若没有找到,就创建一个新的Stage返回。

DAGScheduler的getAncestorShuffleDependencies方法的核心逻辑是返回在shuffleToMap-Stage HashMap中不存在对应的Stage的ShuffleDependency,代码如下。

DAGScheduler的newOrUsedShuffleStage方法的代码如下。

至此,整个DAGScheduler划分Stage的过程已经介绍完毕。Stage划分完成后,DAG-Scheduler就会回到HandleJobSubmitted方法中调用submitStage方法。在submitStage方法中,从finalStage(ResultStage对象实例)开始回溯,直到没有Parent Stage为止,提交整个Job的所有Stage,在某一个Stage,DAGScheduler会调用submitMissingTasks方法把Tasks提交给TaskScheduler进行细粒度的Task调度。

下面介绍在Stage内部Task获取最佳位置的算法。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈