@rickyChen
2016-04-20T11:09:14.000000Z
字数 1884
阅读 2569
Spark
Yarn
我们将以一个Spark Streaming为例,简述Spark on Yarn客户端模式下作业提交流程。作业是通过spark-submit脚本提交的,因此整个流程从spark-submit代码开始分析。
submit
获取提交代码的MainClass
Utils.classForName
创建相关的类,并获取其中的mainMethod
mainMethod
,开始运行作业的main方法SparkConf
类,其中封装了Spark和Application相关配置信息。SparkConf
和批处理间隔做给参数创建一个StreamingContext
类StreamingContext
初始化的过程中,调用构造器,新建一个SparkContext
类.新建SparkContext
的过程中,有以下步骤需要关注(以下步骤按顺序执行): SparkConf
和listenerBus
为参数调用createSparkEnv
函数。其中,listenerBus
是spark中的监听器,包括JobProgressListener
。在createSparkEnv
调用的过程中,将调用SparkEnv
对象的createDriverEnv
成员函数,在这个过程中会创建一个actorSystem
和一个rpcEnv
,生成一个driver
,这将创建一个SparkEnv
对象,SparkEnv
对象中将封装诸如rpcEnv
,actorSystem
, cacheManager
, mapOutputTracker
, shuffleManager
, broadcastManager
, blockManager
, memoryManager
等成员类,成员类的作用如下: shuffleManager
会在driver和每个executor中创建,我们可以通过spark.shuffle.manager
来对shuffle进行配置,executor可以同过shuffleManager
接口读写数据。BlockManager
相应接口实现SparkStatusTracker
类方法将调用JobProgressListener
类中的成员变量,SparkStatusTracker
可以获得Application中Stage、Job的具体信息,但只提供最近几个Jobs/Stages信息。driver
上的一个类,负责接受来自executor
的心跳信息。createTaskScheduler
,返回两个对象:_schedulerBackend
、_taskScheduler
,并创建_dagScheduler
。DAGScheduler
初始化完成之后,将调用_taskScheduler.start()
,这一步主要进行了: ClientArguements
类,封装一些Application中需要的资源相关的配置信息。ClientArguements
为参数,新建一个Client
类Client.submitApplication
yarnClient
,从集群上申请一个Application,获取Application id,判断集群是否有足够资源,否则中断。向yarn集群申请一个Container运行ApplicationMaster
,最后把整个Application提交到Yarn集群上运行。