@w1992wishes
2018-10-28T14:30:23.000000Z
字数 4942
阅读 998
spark
本篇结构:
Spark 定义了一个特质 ListenerBus,可以接受事件并且将事件提交到对应事件的监听器。
该特征主要有一个 listeners 成员,用于维护所有注册的监听器,其数据结构是一个线程安全的 CopyOnWriteArrayList[L]。
该特征还有几个主要的函数:
上图是 spark 2.1.0 版本事件总线的继承关系,版本不同,会略有不同。
每个 ListenerBus 用于将不同的 Event 投递到不同的 Listener 中,下面以主要分析下 LiveListenerBus。
LiveListenerBus 继承 SparkListenerBus,和其他 ListenerBus 不同的是, LiveListenerBus 是将事件都放到一个队列中,然后另外一个线程不断从队列获取事件,将事件异步投递给监听器,达到实时刷新UI界面数据的效果。
// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
if (queueSize <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
}
queueSize
}
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.createWithDefault(10000)
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
private val logDroppedEvent = new AtomicBoolean(false)
// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)
eventLock:表示队列中事件产生和消费的一个计数器,当有新的事件到来时释放信号量,当对事件进行处理时获取信号量,eventLock = new Semaphore(0);
listenerThread:异步处理事件的线程;
private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
while (true) {
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event)
} finally {
self.synchronized {
processingEvent = false
}
}
}
}
}
}
代码不算复杂,主要逻辑是:
DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的。
def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
return
}
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}
val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}
用一张图总结下的 Spark 的事件总线大致的流程:
这篇文章内容和 《spark内核设计的艺术架构设计与实现》 关于事件总线的描述章节相差不多,流程图也一样。之所以还要花费时间记录,是因为这样才更有感觉,正所谓“好记性,不如烂笔头”。