@w1992wishes
        
        2018-10-28T06:30:23.000000Z
        字数 4942
        阅读 1146
    spark
本篇结构:
Spark 定义了一个特质 ListenerBus,可以接受事件并且将事件提交到对应事件的监听器。
该特征主要有一个 listeners 成员,用于维护所有注册的监听器,其数据结构是一个线程安全的 CopyOnWriteArrayList[L]。
该特征还有几个主要的函数:


上图是 spark 2.1.0 版本事件总线的继承关系,版本不同,会略有不同。
每个 ListenerBus 用于将不同的 Event 投递到不同的 Listener 中,下面以主要分析下 LiveListenerBus。
LiveListenerBus 继承 SparkListenerBus,和其他 ListenerBus 不同的是, LiveListenerBus 是将事件都放到一个队列中,然后另外一个线程不断从队列获取事件,将事件异步投递给监听器,达到实时刷新UI界面数据的效果。
// Cap the capacity of the event queue so we get an explicit error (rather than// an OOM exception) if it's perpetually being added to more quickly than it's being drained.private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)private def validateAndGetQueueSize(): Int = {val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)if (queueSize <= 0) {throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")}queueSize}private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size").intConf.createWithDefault(10000)
// Indicate if `start()` is calledprivate val started = new AtomicBoolean(false)// Indicate if `stop()` is calledprivate val stopped = new AtomicBoolean(false)
/** A counter for dropped events. It will be reset every time we log it. */private val droppedEventsCounter = new AtomicLong(0L)/** When `droppedEventsCounter` was logged last time in milliseconds. */@volatile private var lastReportTimestamp = 0L
// Indicate if we are processing some event// Guarded by `self`private var processingEvent = false
private val logDroppedEvent = new AtomicBoolean(false)
// A counter that represents the number of events produced and consumed in the queueprivate val eventLock = new Semaphore(0)
eventLock:表示队列中事件产生和消费的一个计数器,当有新的事件到来时释放信号量,当对事件进行处理时获取信号量,eventLock = new Semaphore(0);
listenerThread:异步处理事件的线程;
private val listenerThread = new Thread(name) {setDaemon(true)override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {LiveListenerBus.withinListenerThread.withValue(true) {while (true) {eventLock.acquire()self.synchronized {processingEvent = true}try {val event = eventQueue.pollif (event == null) {// Get out of the while loop and shutdown the daemon threadif (!stopped.get) {throw new IllegalStateException("Polling `null` from eventQueue means" +" the listener bus has been stopped. So `stopped` must be true")}return}postToAll(event)} finally {self.synchronized {processingEvent = false}}}}}}
代码不算复杂,主要逻辑是:
DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的。
def post(event: SparkListenerEvent): Unit = {if (stopped.get) {// Drop further events to make `listenerThread` exit ASAPlogError(s"$name has already stopped! Dropping event $event")return}val eventAdded = eventQueue.offer(event)if (eventAdded) {eventLock.release()} else {onDropEvent(event)droppedEventsCounter.incrementAndGet()}val droppedEvents = droppedEventsCounter.getif (droppedEvents > 0) {// Don't log too frequentlyif (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {// There may be multiple threads trying to decrease droppedEventsCounter.// Use "compareAndSet" to make sure only one thread can win.// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and// then that thread will update it.if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {val prevLastReportTimestamp = lastReportTimestamplastReportTimestamp = System.currentTimeMillis()logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +new java.util.Date(prevLastReportTimestamp))}}}}
用一张图总结下的 Spark 的事件总线大致的流程:

这篇文章内容和 《spark内核设计的艺术架构设计与实现》 关于事件总线的描述章节相差不多,流程图也一样。之所以还要花费时间记录,是因为这样才更有感觉,正所谓“好记性,不如烂笔头”。