首页 理论教育 Worker部署步骤详解

Worker部署步骤详解

时间:2023-06-29 理论教育 版权反馈
【摘要】:Spark中的各个组件是通过脚本来启动部署的,为了解密Worker的部署,以启动Worker的脚本为切入点开始分析。部署Worker组件时,最简单的方式是通过配置Spark部署目录下的conf/slaves文件,然后以批量的方式来启动集群中在该文件中列出的全部结点上的Worker实例。可以看到,Worker伴生对象中的main方法的格式和Master基本一致。最终会实例化一个Worker对象。

Worker部署步骤详解

Spark中的各个组件是通过脚本来启动部署的,为了解密Worker的部署,以启动Worker的脚本为切入点开始分析。

部署Worker组件时,最简单的方式是通过配置Spark部署目录下的conf/slaves文件,然后以批量的方式来启动集群中在该文件中列出的全部结点上的Worker实例。启动组件的命令如下。

另一种方式是动态地在某个新增结点上(注意:是新增结点,如果之前已经部署过的话,可以参考后面对启动多个实例的进一步分析)启动一个Worker实例,此时可以在该新增的结点上执行如下启动命令。

其中,参数MasterURL表示当前集群中Master的监听地址,启动后Worker会通过该地址动态注册到Master组件,从而实现为集群动态添加Worker结点的目的。

1.Worker部署脚本的解析

部署脚本根据单个结点及多个结点的Worker部署,对应有两个脚本:start-slave.sh和start-slaves.sh。其中start-slave.sh负责在脚本执行结点启动一个Worker组件,start-slaves.sh脚本则会读取配置的conf/slaves文件,逐个启动集群中各个Slave结点上的Worker组件。

(1)首先分析脚本start-slaves.sh

脚本start-slaves.sh提供了批量启动集群中各个Slave结点上的Worker组件的方法。即,可以在配置好Slave结点(即配置好conf/slaves文件)之后,通过该脚本一次性全部启动集群中的Worker组件。

脚本的代码如下。

其中,脚本slaves.sh通过SSH协议在指定的各个Slave结点上执行各种命令,代码比较简单,建议大家自行查看。

在ssh启动的start-slave.sh命令中,可以看到它的参数是"spark://$SPARK_MASTER IP:$SPARK_MASTER_PORT",这实际上就是Master URL的值的拼接代码。

(2)继续分析脚本start-slave.sh

从前面start-slaves.sh脚本的分析可以看到,最终是在各个Slave结点上执行start-slave.sh脚本来部署Worker组件的,相应的,就可以通过该脚本动态地为集群添加新的Worker组件。

脚本的代码如下。

在手动启动Worker实例时,如果需要在一个结点上部署多个Worker组件,则需要配置SPARK WORKER INSTANCES环境变量,否则多次启动脚本部署Worker组件时会报错,其原因在于spark-daemon.sh脚本的执行控制,这里给出关键代码的简单分析。

首先脚本中有实例是否已经运行的判断,代码如下。

其中,记录对应实例的PID的文件相关代码如下。

从上面的分析可以看出,如果不是通过设置SPARK_WORKER_INSTANCES,然后一次性启动多个Worker实例,而是手动一个个地启动的话,对应的,在脚本中每次启动时的实例编号都是1,在后台守护进程的spark-daemon.sh脚本中生成的pid就是同一个文件,因此第二次启动时,pid文件已经存在,此时就会报错(对应停止时也是通过读取pid文件获取进程ID的,因此自动停止多个实例的话,也需要设置SPARK_WORKER_INSTANCES)。(www.xing528.com)

2.Worker的源代码解析

首先查看Worker伴生对象中的main方法,代码如下。

可以看到,Worker伴生对象中的main方法的格式和Master基本一致。通过参数的类型WorkerArguments来解析命令行参数,具体的代码解析可以参考Master结点部署时的Master-Arguments的代码解析。

另外,MasterArguments中的printUsageAndExit方法对应的就是命令行中的帮助信息。

在解析完Worker的参数之后,调用startRpcEnvAndEndpoint方法启动RPC通信环境及Worker的RPC通信终端。该方法的代码解析可以参考Master结点部署时使用的同名方法的代码解析。

最终会实例化一个Worker对象。Worker也是继承ThreadSafeRpcEndpoint,对应的也是一个RPC的通信终端,实例化该对象后会调用onStart方法,该方法的代码如下。

其中,createWorkDir()方法对应构建了该Worker结点上的工作目录,后续在该结点上执行的Application相关信息都会存放在该目录下。代码如下。

可以看到如果没有设置workDirPath,默认使用的是sparkHome目录下的work子目录。对应的workDirPath在Worker实例化时传入,反推代码可以查到该变量在WorkerArguments中设置。相关代码有两处,一处在WorkerArguments的主构造体中,代码如下。

即workDirPath由环境变量"SPARK_WORKER_DIR"设置。

另外一处在命令行选项解析时设置,代码如下。

即workDirPath由启动Worker实例时传入的可选项"--work-dir"设置。

属性配置:通常由命令可选项来动态设置启动时的配置属性,此时配置的优先级高于默认的属性文件及环境变量中设置的属性。

启动Worker后的一个关键步骤就是注册到Master,对应的方法registerWithMaster()的代码如下。

继续查看tryRegisterAllMasters()方法,代码如下。

其中registerWithMaster(masterEndpoint)向特定Master的RPC通信终端发送消息,并且在接收到反馈消息后,进一步调用handleRegisterResponse方法进行处理。对应的处理代码如下。

分析到这一步,已经明确了注册及对注册的反馈信息的处理细节,下面来进一步分析注册重试定时器的相关处理,注册重试定时器会定期向Worker本身发送ReregisterWithMaster消息,因此可以在receive方法中查看该消息的处理,具体代码如下。

最终会调用registerWithMaster方法。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈