[关闭]
@w1992wishes 2018-10-28T14:30:23.000000Z 字数 4942 阅读 998

【Spark】Spark 事件总线

spark


本篇结构:

一、事件总线介绍

Spark 定义了一个特质 ListenerBus,可以接受事件并且将事件提交到对应事件的监听器。

该特征主要有一个 listeners 成员,用于维护所有注册的监听器,其数据结构是一个线程安全的 CopyOnWriteArrayList[L]。

该特征还有几个主要的函数:

二、ListenerBus 继承体系

上图是 spark 2.1.0 版本事件总线的继承关系,版本不同,会略有不同。

每个 ListenerBus 用于将不同的 Event 投递到不同的 Listener 中,下面以主要分析下 LiveListenerBus。

三、LiveListenerBus 详解

LiveListenerBus 继承 SparkListenerBus,和其他 ListenerBus 不同的是, LiveListenerBus 是将事件都放到一个队列中,然后另外一个线程不断从队列获取事件,将事件异步投递给监听器,达到实时刷新UI界面数据的效果。

3.1、LiveListenerBus 中的属性:

  1. // Cap the capacity of the event queue so we get an explicit error (rather than
  2. // an OOM exception) if it's perpetually being added to more quickly than it's being drained.
  3. private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
  4. private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
  5. private def validateAndGetQueueSize(): Int = {
  6. val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
  7. if (queueSize <= 0) {
  8. throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
  9. }
  10. queueSize
  11. }
  12. private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
  13. ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
  14. .intConf
  15. .createWithDefault(10000)
  1. // Indicate if `start()` is called
  2. private val started = new AtomicBoolean(false)
  3. // Indicate if `stop()` is called
  4. private val stopped = new AtomicBoolean(false)
  1. /** A counter for dropped events. It will be reset every time we log it. */
  2. private val droppedEventsCounter = new AtomicLong(0L)
  3. /** When `droppedEventsCounter` was logged last time in milliseconds. */
  4. @volatile private var lastReportTimestamp = 0L
  1. // Indicate if we are processing some event
  2. // Guarded by `self`
  3. private var processingEvent = false
  1. private val logDroppedEvent = new AtomicBoolean(false)
  1. // A counter that represents the number of events produced and consumed in the queue
  2. private val eventLock = new Semaphore(0)

3.2、异步事件处理线程

  1. private val listenerThread = new Thread(name) {
  2. setDaemon(true)
  3. override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
  4. LiveListenerBus.withinListenerThread.withValue(true) {
  5. while (true) {
  6. eventLock.acquire()
  7. self.synchronized {
  8. processingEvent = true
  9. }
  10. try {
  11. val event = eventQueue.poll
  12. if (event == null) {
  13. // Get out of the while loop and shutdown the daemon thread
  14. if (!stopped.get) {
  15. throw new IllegalStateException("Polling `null` from eventQueue means" +
  16. " the listener bus has been stopped. So `stopped` must be true")
  17. }
  18. return
  19. }
  20. postToAll(event)
  21. } finally {
  22. self.synchronized {
  23. processingEvent = false
  24. }
  25. }
  26. }
  27. }
  28. }
  29. }

代码不算复杂,主要逻辑是:

3.3、异步事件处理线程的事件来源

DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的。

  1. def post(event: SparkListenerEvent): Unit = {
  2. if (stopped.get) {
  3. // Drop further events to make `listenerThread` exit ASAP
  4. logError(s"$name has already stopped! Dropping event $event")
  5. return
  6. }
  7. val eventAdded = eventQueue.offer(event)
  8. if (eventAdded) {
  9. eventLock.release()
  10. } else {
  11. onDropEvent(event)
  12. droppedEventsCounter.incrementAndGet()
  13. }
  14. val droppedEvents = droppedEventsCounter.get
  15. if (droppedEvents > 0) {
  16. // Don't log too frequently
  17. if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
  18. // There may be multiple threads trying to decrease droppedEventsCounter.
  19. // Use "compareAndSet" to make sure only one thread can win.
  20. // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
  21. // then that thread will update it.
  22. if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
  23. val prevLastReportTimestamp = lastReportTimestamp
  24. lastReportTimestamp = System.currentTimeMillis()
  25. logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
  26. new java.util.Date(prevLastReportTimestamp))
  27. }
  28. }
  29. }
  30. }

四、流程总结

用一张图总结下的 Spark 的事件总线大致的流程:

五、参考资料

这篇文章内容和 《spark内核设计的艺术架构设计与实现》 关于事件总线的描述章节相差不多,流程图也一样。之所以还要花费时间记录,是因为这样才更有感觉,正所谓“好记性,不如烂笔头”。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注