@MiloXia
2015-10-23T09:30:48.000000Z
字数 12433
阅读 6726
akka
ActorSystem初始化时
//读取配置文件创建dispatchers(默认的配置文件只有一个dispatcher)
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))
//返回默认的dispatcher (可在配置里更改)
val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
new Dispatchers(...)中
//读取配置文件
val defaultDispatcherConfig: Config =
idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId))
配置文件为:
default-dispatcher {
executor=thread-pool-executor.
type = "Dispatcher"
executor = "default-executor"
default-executor {
#默认为forkjoinpool jdk7才支持(误,看批注)
fallback = "fork-join-executor"
}
# forkjoinpool默认线程数 max(min(cpu线程数 * 3.0, 64), 8)
fork-join-executor {
parallelism-min = 8
parallelism-factor = 3.0
parallelism-max = 64
}
......
}
ActorSystem中
val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
中的defaultGlobalDispatcher方法
def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)
lookup调用了lookupConfigurator(id).dispatcher()
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
dispatcherConfigurators.get(id) match {
case null ⇒
// It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
// That shouldn't happen often and in case it does the actual ExecutorService isn't
// created until used, i.e. cheap.
val newConfigurator =
if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
else throw new ConfigurationException(s"Dispatcher [$id] not configured")
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
case null ⇒ newConfigurator
case existing ⇒ existing
}
case existing ⇒ existing
}
}
主要是configuratorFrom方法
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
cfg.getString("type") match {
case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)
case "BalancingDispatcher" ⇒
// FIXME remove this case in 2.4
throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " +
"During a migration period you can still use BalancingDispatcher by specifying the full class name: " +
classOf[BalancingDispatcherConfigurator].getName)
case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
case fqn ⇒
val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
case exception ⇒
throw new ConfigurationException(
("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
"make sure it has constructor with [com.typesafe.config.Config] and " +
"[akka.dispatch.DispatcherPrerequisites] parameters")
.format(fqn, cfg.getString("id")), exception)
}).get
}
}
根据配置"type"为"Dispatcher" 则为new DispatcherConfigurator
DispatcherConfigurator类创建了
private val instance = new Dispatcher(
this,
config.getString("id"),
config.getInt("throughput"),
config.getNanosDuration("throughput-deadline-time"),
configureExecutor(),
config.getMillisDuration("shutdown-timeout"))
/**
* Returns the same dispatcher instance for each invocation
*/
override def dispatcher(): MessageDispatcher = instance
来看Dispatcher类 "throughput"和"throughput-deadline-time"干嘛用后面会说
这类定义了dispatch方法用来执行mailbox[后面会说到]还有createMailbox创建mailbox
其父类MessageDispatcher定义了将actor加入dispatcher的attach方法
new Dispatcher 类时的configureExecutor()方法创建了forkjoinpool
def configureExecutor(): ExecutorServiceConfigurator = {
def configurator(executor: String): ExecutorServiceConfigurator = executor match {
case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case fqcn ⇒
val args = List(
classOf[Config] -> config,
classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({
case exception ⇒ throw new IllegalArgumentException(
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
make sure it has an accessible constructor with a [%s,%s] signature""")
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
}).get
}
config.getString("executor") match {
case "default-executor" ⇒ new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
case other ⇒ configurator(other)
}
}
根据配置直接走new ForkJoinExecutorConfigurator 看看这个类
class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing)
}
final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val tf = threadFactory match {
case m: MonitorableThreadFactory ⇒
// add the dispatcher id to the thread names
m.withName(m.name + "-" + id)
case other ⇒ other
}
new ForkJoinExecutorServiceFactory(
validate(tf),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")))
}
new ForkJoinExecutorServiceFactory 提供了线程池的factory
读取了几个计算初始线程池大小的参数 会创建 AkkaForkJoinPool
继续回到configureExecutor()方法的
case "default-executor" ⇒ new DefaultExecutorServiceConfigurator
来看看DefaultExecutorServiceConfigurator
val provider: ExecutorServiceFactoryProvider =
prerequisites.defaultExecutionContext match {
case Some(ec) ⇒
prerequisites.eventStream.publish(....)
new AbstractExecutorService with ExecutorServiceFactory with ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = this
def createExecutorService: ExecutorService = this
def shutdown(): Unit = ()
def isTerminated: Boolean = false
def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false
def shutdownNow(): ju.List[Runnable] = ju.Collections.emptyList()
def execute(command: Runnable): Unit = ec.execute(command)
def isShutdown: Boolean = false
}
case None ⇒ fallback
}
provider被初始化为AbstractExecutorService的匿名实例 根据配置为ForkJoinPool
下面看看actorOf 创建Actor时的逻辑
try {
val dispatcher = system.dispatchers.lookup(props2.dispatcher)
val mailboxType = system.mailboxes.getMailboxType(props2, dispatcher.configurator.config)
if (async) new RepointableActorRef(system, props2, dispatcher, mailboxType, supervisor, path).initialize(async)
else new LocalActorRef(system, props2, dispatcher, mailboxType, supervisor, path)
} catch {
case NonFatal(e) ⇒ throw new ConfigurationException(
s"configuration problem while creating [$path] with dispatcher [${props2.dispatcher}] and mailbox [${props2.mailbox}]", e)
}
跟进任何一种ref 来看看RepointableActorRef
def newCell(old: UnstartedCell): Cell =
new ActorCell(system, this, props, dispatcher, supervisor).init(sendSupervise = false, mailboxType)
ActorCell的init
final def init(sendSupervise: Boolean, mailboxType: MailboxType): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
val mbox = dispatcher.createMailbox(this, mailboxType)
/*
* The mailboxType was calculated taking into account what the MailboxType
* has promised to produce. If that was more than the default, then we need
* to reverify here because the dispatcher may well have screwed it up.
*/
// we need to delay the failure to the point of actor creation so we can handle
// it properly in the normal way
val actorClass = props.actorClass
val createMessage = mailboxType match {
case _: ProducesMessageQueue[_] if system.mailboxes.hasRequiredType(actorClass) ⇒
val req = system.mailboxes.getRequiredType(actorClass)
if (req isInstance mbox.messageQueue) Create(None)
else {
val gotType = if (mbox.messageQueue == null) "null" else mbox.messageQueue.getClass.getName
Create(Some(ActorInitializationException(self,
s"Actor [$self] requires mailbox type [$req] got [$gotType]")))
}
case _ ⇒ Create(None)
}
swapMailbox(mbox)
mailbox.setActor(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, createMessage)
if (sendSupervise) {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false))
}
this
}
dispatcher.createMailbox(this, mailboxType) 创建mailbox
mailbox.setActor(this) mailbox注册到当前的actorCell中
当Cell start时
def start(): this.type = {
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)
this
}
dispatcher.attach 这个actor
再来看看Mailbox的run方法
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
只看 processMailbox()
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue()
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs)
}
}
throughput和throughputDeadlineTime在这里用到
ActorCell的父类中Dispatch 的sendMessage 方法定义了mailbox的执行
def sendMessage(msg: Envelope): Unit =
try {
if (system.settings.SerializeAllMessages) {
val unwrapped = (msg.message match {
case DeadLetter(wrapped, _, _) ⇒ wrapped
case other ⇒ other
}).asInstanceOf[AnyRef]
if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
val s = SerializationExtension(system)
s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
}
}
dispatcher.dispatch(this, msg)
} catch handleException
回到了dispatcher.dispatch(this, msg)
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}
看一下registerForExecution
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService execute mbox
true
} catch {
......
看到 executorService execute mbox
而sendMessage就是!
在actor内部的receive的方法中给一个actor发消息
那么此时相当于ForkJoinWorkerThread在执行executorService execute mbox
此时看到代码
final class AkkaForkJoinPool(parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
override def execute(r: Runnable): Unit = {
if (r eq null) throw new NullPointerException("The Runnable must not be null")
val task =
if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]]
else new AkkaForkJoinTask(r)
Thread.currentThread match {
case worker: ForkJoinWorkerThread if worker.getPool eq this ⇒ task.fork()
case _ ⇒ super.execute(task)
}
}
def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}
mailbox会被包成AkkaForkJoinTask
task.fork() 以为这加入当前工作线程的工作队列尾部
加了一个池的限制,原始的ForkJoinTask的fork方法 是不区分池的
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}
综上 akka的Dispatcher还是单个队列多个消费线程策略,但是因为Actor的内部状态是线程安全的 所以一个Mailbox的messageQueue只能被一个工作线程执行,所以akka给mailbox加了状态
// Primary status
final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant
final val Closed = 1 // Deliberately without type ascription to make it a compile-time constant
// Secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant
// Shifted by 2: the suspend count!
final val shouldScheduleMask = 3
final val shouldNotProcessMask = ~2
final val suspendMask = ~3
final val suspendUnit = 4
并且在processMailbox方法里做状态判断
if (shouldProcessMessage) { ...}
利用CAS做状态转换
@inline
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)
//设置为被占用
@tailrec
final def setAsScheduled(): Boolean = {
val s = currentStatus
/*
* Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set.
*/
if ((s & shouldScheduleMask) != Open) false
else updateStatus(s, s | Scheduled) || setAsScheduled()
}
registerForExecution(mbox, true, false)方法执行时切换为被执行状态setAsScheduled。
这就导致某个线程在获得mailbox的Task时可能空转一次,然后执行别的Task
这有点让forkjoin的工作窃取 不是100%的高效
反观Netty的NioEventLoop 对I/O的操作窜行化,不发生线程的切换。利用率更高一点。