@rickyChen
2016-04-20T03:09:14.000000Z
字数 1884
阅读 2822
Spark Yarn
我们将以一个Spark Streaming为例,简述Spark on Yarn客户端模式下作业提交流程。作业是通过spark-submit脚本提交的,因此整个流程从spark-submit代码开始分析。
submit获取提交代码的MainClassUtils.classForName创建相关的类,并获取其中的mainMethodmainMethod,开始运行作业的main方法SparkConf类,其中封装了Spark和Application相关配置信息。SparkConf和批处理间隔做给参数创建一个StreamingContext类StreamingContext初始化的过程中,调用构造器,新建一个SparkContext类.新建SparkContext的过程中,有以下步骤需要关注(以下步骤按顺序执行): SparkConf和listenerBus为参数调用createSparkEnv函数。其中,listenerBus是spark中的监听器,包括JobProgressListener。在createSparkEnv调用的过程中,将调用SparkEnv对象的createDriverEnv成员函数,在这个过程中会创建一个actorSystem和一个rpcEnv,生成一个driver,这将创建一个SparkEnv对象,SparkEnv对象中将封装诸如rpcEnv,actorSystem, cacheManager, mapOutputTracker, shuffleManager, broadcastManager, blockManager, memoryManager 等成员类,成员类的作用如下: shuffleManager会在driver和每个executor中创建,我们可以通过spark.shuffle.manager来对shuffle进行配置,executor可以同过shuffleManager接口读写数据。BlockManager相应接口实现SparkStatusTracker类方法将调用JobProgressListener类中的成员变量,SparkStatusTracker可以获得Application中Stage、Job的具体信息,但只提供最近几个Jobs/Stages信息。driver上的一个类,负责接受来自executor的心跳信息。createTaskScheduler,返回两个对象:_schedulerBackend、_taskScheduler,并创建_dagScheduler。DAGScheduler初始化完成之后,将调用_taskScheduler.start(),这一步主要进行了: ClientArguements类,封装一些Application中需要的资源相关的配置信息。ClientArguements为参数,新建一个Client类Client.submitApplicationyarnClient,从集群上申请一个Application,获取Application id,判断集群是否有足够资源,否则中断。向yarn集群申请一个Container运行ApplicationMaster,最后把整个Application提交到Yarn集群上运行。