@MiloXia
2015-03-23T18:38:54.000000Z
字数 13239
阅读 3826
并发
Fork&Join框架的论文:http://gee.cs.oswego.edu/dl/papers/fj.pdf
论文翻译:http://ifeve.com/a-java-fork-join-framework/
Fork/Join框架执行的任务有以下局限性:
Fork/Join框架的核心是由以下两个类:
ForkJoinPool:它实现ExecutorService接口和work-stealing算法。它管理工作线程和提供关于任务的状态和它们执行的信息。
ForkJoinTask: 它是将在ForkJoinPool中执行的任务的基类。它提供在任务中执行fork()和join()操作的机制,并且这两个方法控制任务的状态。通常, 为了实现你的Fork/Join任务,你将实现两个子类的子类的类:RecursiveAction对于没有返回结果的任务和RecursiveTask 对于返回结果的任务。
使用fork/join框架的第一步是编写执行一部分工作的代码。你的代码结构看起来应该与下面所示的伪代码类似:
if (当前这个任务工作量足够小)
直接完成这个任务
else
将这个任务或这部分工作分解成两个部分
分别触发(invoke)这两个子任务的执行,并等待结
你需要将这段代码包裹在一个ForkJoinTask的子类中。不过,通常情况下会使用一种更为具体的的类型,或者是RecursiveTask(会返回一个结果),或者是RecursiveAction。
当你的ForkJoinTask子类准备好了,创建一个代表所有需要完成工作的对象,然后将其作为参数传递给一个ForkJoinPool实例的invoke()方法即可。
Fork/Join框架(二)创建一个Fork/Join池 http://ifeve.com/fork-join-2/
Fork/Join框架(四)异步运行任务 http://ifeve.com/fork-join-4/
当你在ForkJoinPool中执行ForkJoinTask时,你可以使用同步或异步方式来实现。
当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任务将被阻塞,直到提交给池的任务完成它的执行。这允许ForkJoinPool类使用work-stealing算法,分配一个新的任务给正在执行睡眠任务的工作线程。反之,当你使用异步方法(比如:fork()方法),这个任务将继续它的执行,所以ForkJoinPool类不能使用work-stealing算法来提高应用程序的性能。在这种情况下,只有当你调用join()或get()方法来等待任务的完成时,ForkJoinPool才能使用work-stealing算法。
http://www.molotang.com/articles/696.html
http://ifeve.com/talk-concurrency-forkjoin/
ActorSystem初始化时
//读取配置文件创建dispatchers(默认的配置文件只有一个dispatcher)
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))
//返回默认的dispatcher (可在配置里更改)
val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
new Dispatchers(...)中
//读取配置文件
val defaultDispatcherConfig: Config =
idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId))
配置文件为:
default-dispatcher {
executor=thread-pool-executor.
type = "Dispatcher"
executor = "default-executor"
default-executor {
#默认为forkjoinpool jdk7才支持
fallback = "fork-join-executor"
}
# forkjoinpool默认线程数 max(min(cpu线程数 * 3.0, 64), 8)
fork-join-executor {
parallelism-min = 8
parallelism-factor = 3.0
parallelism-max = 64
}
......
}
ActorSystem中
val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
中的defaultGlobalDispatcher方法
def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)
lookup调用了lookupConfigurator(id).dispatcher()
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
dispatcherConfigurators.get(id) match {
case null ⇒
// It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
// That shouldn't happen often and in case it does the actual ExecutorService isn't
// created until used, i.e. cheap.
val newConfigurator =
if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
else throw new ConfigurationException(s"Dispatcher [$id] not configured")
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
case null ⇒ newConfigurator
case existing ⇒ existing
}
case existing ⇒ existing
}
}
主要是configuratorFrom方法
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
cfg.getString("type") match {
case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)
case "BalancingDispatcher" ⇒
// FIXME remove this case in 2.4
throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " +
"During a migration period you can still use BalancingDispatcher by specifying the full class name: " +
classOf[BalancingDispatcherConfigurator].getName)
case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
case fqn ⇒
val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
case exception ⇒
throw new ConfigurationException(
("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
"make sure it has constructor with [com.typesafe.config.Config] and " +
"[akka.dispatch.DispatcherPrerequisites] parameters")
.format(fqn, cfg.getString("id")), exception)
}).get
}
}
根据配置"type"为"Dispatcher" 则为new DispatcherConfigurator
DispatcherConfigurator类创建了
private val instance = new Dispatcher(
this,
config.getString("id"),
config.getInt("throughput"),
config.getNanosDuration("throughput-deadline-time"),
configureExecutor(),
config.getMillisDuration("shutdown-timeout"))
/**
* Returns the same dispatcher instance for each invocation
*/
override def dispatcher(): MessageDispatcher = instance
来看Dispatcher类 "throughput"和"throughput-deadline-time"干嘛用后面会说
这类定义了dispatch方法用来执行mailbox[后面会说到]还有createMailbox创建mailbox
其父类MessageDispatcher定义了将actor加入dispatcher的attach方法
new Dispatcher 类时的configureExecutor()方法创建了forkjoinpool
def configureExecutor(): ExecutorServiceConfigurator = {
def configurator(executor: String): ExecutorServiceConfigurator = executor match {
case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case fqcn ⇒
val args = List(
classOf[Config] -> config,
classOf[DispatcherPrerequisites] -> prerequisites)
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({
case exception ⇒ throw new IllegalArgumentException(
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
make sure it has an accessible constructor with a [%s,%s] signature""")
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
}).get
}
config.getString("executor") match {
case "default-executor" ⇒ new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
case other ⇒ configurator(other)
}
}
根据配置直接走new ForkJoinExecutorConfigurator 看看这个类
class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing)
}
final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val tf = threadFactory match {
case m: MonitorableThreadFactory ⇒
// add the dispatcher id to the thread names
m.withName(m.name + "-" + id)
case other ⇒ other
}
new ForkJoinExecutorServiceFactory(
validate(tf),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")))
}
new ForkJoinExecutorServiceFactory 提供了线程池的factory
读取了几个计算初始线程池大小的参数 会创建 AkkaForkJoinPool
继续回到configureExecutor()方法的
case "default-executor" ⇒ new DefaultExecutorServiceConfigurator
来看看DefaultExecutorServiceConfigurator
val provider: ExecutorServiceFactoryProvider =
prerequisites.defaultExecutionContext match {
case Some(ec) ⇒
prerequisites.eventStream.publish(....)
new AbstractExecutorService with ExecutorServiceFactory with ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = this
def createExecutorService: ExecutorService = this
def shutdown(): Unit = ()
def isTerminated: Boolean = false
def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false
def shutdownNow(): ju.List[Runnable] = ju.Collections.emptyList()
def execute(command: Runnable): Unit = ec.execute(command)
def isShutdown: Boolean = false
}
case None ⇒ fallback
}
provider被初始化为AbstractExecutorService的匿名实例 根据配置为ForkJoinPool
下面看看actorOf 创建Actor时的逻辑
try {
val dispatcher = system.dispatchers.lookup(props2.dispatcher)
val mailboxType = system.mailboxes.getMailboxType(props2, dispatcher.configurator.config)
if (async) new RepointableActorRef(system, props2, dispatcher, mailboxType, supervisor, path).initialize(async)
else new LocalActorRef(system, props2, dispatcher, mailboxType, supervisor, path)
} catch {
case NonFatal(e) ⇒ throw new ConfigurationException(
s"configuration problem while creating [$path] with dispatcher [${props2.dispatcher}] and mailbox [${props2.mailbox}]", e)
}
跟进任何一种ref 来看看RepointableActorRef
def newCell(old: UnstartedCell): Cell =
new ActorCell(system, this, props, dispatcher, supervisor).init(sendSupervise = false, mailboxType)
ActorCell的init
final def init(sendSupervise: Boolean, mailboxType: MailboxType): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
val mbox = dispatcher.createMailbox(this, mailboxType)
/*
* The mailboxType was calculated taking into account what the MailboxType
* has promised to produce. If that was more than the default, then we need
* to reverify here because the dispatcher may well have screwed it up.
*/
// we need to delay the failure to the point of actor creation so we can handle
// it properly in the normal way
val actorClass = props.actorClass
val createMessage = mailboxType match {
case _: ProducesMessageQueue[_] if system.mailboxes.hasRequiredType(actorClass) ⇒
val req = system.mailboxes.getRequiredType(actorClass)
if (req isInstance mbox.messageQueue) Create(None)
else {
val gotType = if (mbox.messageQueue == null) "null" else mbox.messageQueue.getClass.getName
Create(Some(ActorInitializationException(self,
s"Actor [$self] requires mailbox type [$req] got [$gotType]")))
}
case _ ⇒ Create(None)
}
swapMailbox(mbox)
mailbox.setActor(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, createMessage)
if (sendSupervise) {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false))
}
this
}
dispatcher.createMailbox(this, mailboxType) 创建mailbox
mailbox.setActor(this) mailbox注册到当前的actorCell中
当Cell start时
def start(): this.type = {
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)
this
}
dispatcher.attach 这个actor
再来看看Mailbox的run方法
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
只看 processMailbox()
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue()
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs)
}
}
throughput和throughputDeadlineTime在这里用到
ActorCell的父类中Dispatch 的sendMessage 方法定义了mailbox的执行
def sendMessage(msg: Envelope): Unit =
try {
if (system.settings.SerializeAllMessages) {
val unwrapped = (msg.message match {
case DeadLetter(wrapped, _, _) ⇒ wrapped
case other ⇒ other
}).asInstanceOf[AnyRef]
if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
val s = SerializationExtension(system)
s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
}
}
dispatcher.dispatch(this, msg)
} catch handleException
回到了dispatcher.dispatch(this, msg)
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}
看一下registerForExecution
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService execute mbox
true
} catch {
......
看到 executorService execute mbox
而sendMessage就是!
在actor内部的receive的方法中给一个actor发消息
那么此时相当于ForkJoinWorkerThread在执行executorService execute mbox
此时看到代码
final class AkkaForkJoinPool(parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
override def execute(r: Runnable): Unit = {
if (r eq null) throw new NullPointerException("The Runnable must not be null")
val task =
if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]]
else new AkkaForkJoinTask(r)
Thread.currentThread match {
case worker: ForkJoinWorkerThread if worker.getPool eq this ⇒ task.fork()
case _ ⇒ super.execute(task)
}
}
def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}
mailbox会被包成AkkaForkJoinTask
task.fork() 以为这加入当前工作线程的工作队列尾部