@xtccc
2016-07-22T15:17:43.000000Z
字数 11084
阅读 3803
Akka
参考链接:
目录
Akka’s approach to handling concurrency is based on the Actor Model.
Akka creates a layer between the actors and the underlying system such that actors simply need to process messages.
Akka是高度模块化的,包含以下的JAR包:
akka-actor :
Classic Actors, Typed Actors, IO Actor, etc.
akka-agent :
Agents, integrated with Scala STM.
akka-camel :
akka-cluster :
Cluster membership management, elastic router.
akka-kernel :
Akka microkernel for running a bare-bones mini application server.
akka-remote :
Remote actors.
akka-slf4j :
SLF4J Logger (event bus listener)
What is Actor?
An actor is a container for State, Behavior, a Mailbox, Children Actors, and a Supervisor Strategy. All of these are encapsulated behind an Actor Reference. Actor有明确的生命周期,当不需要再使用它们时,我们应该显式地将其销毁。
Actors can only be created by other actors —— where the top-level actor is provided by the system —— and each created actor is supervised by its parent.
Actor Reference
当我们要与某个actor交互时,我们只会通过actor reference与之交互,而不会直接操作actor本身。
将Actor与Actor Reference分离会带来好处,例如:重启actor时不需要更新actor reference。
Actor and Thread
Akka will run sets of actors on sets of threads, where typically many actors share one thread, and subsequent invocatins of one actor may end up being processed on different threads.
State
When the actor fails and is restarted by its supervisor, the state will be created from scratch, like upon first creating the actor.
Optionally, an actor's state can be automatically recovered to the state before a restart by persisting the received messages and replaying them after the restart. See Persistence.
Mailbox
包括 FIFO Mailbox, Priority Mailbox
一条消息被sender发出,由另一个receiver接收,receiver actor的mailbox的作用就是连接sender与receiver。
Children
An actor can create children for delegating sub-tasks, and it will automatically supervise them. The list of children is maintained within the actor's context and the actor has access to it.
Modification to the list is done by creating (context.actorOf(···)) or stopping (context.stop(child)) children. The actual creation and termination actions happen behind the scene in asynchronous way so that they do not block the supervisor.
The final piece of an actor is its strategy for handling faults of its children. Fault handling is then done transparently by Akka, applying one of the strategies described in Supervision and Monitoring for each incoming failure. As this strategy is fundamental to how an actor system is structured, it cannot be changed once an actor has been created.
Once an actor terminates, i.e. fails in a way which is not handled by a restart, stops itself or is stopped by its supervisor, it will free up its resources, draining all remaining messages from its mailbox into the system’s dead letter mailbox which will forward them to the EventStream as DeadLetters. The mailbox is then replaced within the actor reference with a system mailbox, redirecting all new messages to the EventStream as DeadLetters. This is done on a best effort basis, though, so do not rely on it in order to construct “guaranteed delivery”.
在Akka系统中,每一个actor都是它自己的children的supervisor。如果某个actor a 无法处理某条消息,那么 a 会将自身及其children实施挂起(suspend),并向 a 的supervisor发送一条消息(通常是exception的形式)。
In Akka, the way in which a supervisor reacts to and handles exceptions that percolates up to it from its children is referred to as a supervisor strgegy. Supervisor strategies are the primary and straightforward mechanism by which you define the fault tolerant behavior of your system.
当一个supervisor actor收到了一条代表失败的消息时,它可以有以下选择:
- Resume the child (and its children), keeping its accumulated internal state.
- Restart the child (and its children), cleaing out its accumulated internal state.
- Stop the child (and its children) permanently.
- Escalate the failure, thereby failing itself.
参考 AKKA: Actor References, Paths and Addresses
Actor Path
An actor path consists of an anchor, which identifies the actor system, followed by the concatenation of the path elements, from root guardian to the designated actor; the path elements are the names of the traversed actors and are separated by slashes.
Actor Path Anchors
Each actor path has an address component, describing the protocol and location by which the corresponding actor is reachable, followed by the names of the actors in the hierarchy from the root up.
Examples:
akka://my-sys/user/service-a/worker1
// purely local
akka.tcp://my-sys@host.example.com:5678/user/service-b
// remote
Actors的逻辑路径与物理路径
我们将创建一个简单地Akka系统,实现如下功能:
系统中有1个Master和1个Listener。用户向Master发送开始的指令,Master收到后启动5个Workers并向它们每人发送一条消息。每个Worker收到消息后,向Master回应一条新的消息。Master集齐了5个Worker的消息后,就这些消息集中起来并发送给Listener。Listener收到消息后将它打印出来。
import java.util.Date
import akka.actor.{ActorSystem, ActorRef, Props, Actor}
import akka.routing.RoundRobinRouter
import scala.collection.mutable
import scala.util.Random
def main(args: Array[String]) {
println("[main] 创建简单的Akka系统")
/** `ActorSystem` 是一个Akka
container,包含在该context中创建的全部actors
`ActorSystem` is a heavy object: create only one per application */
val system = ActorSystem("Simple-Akka-System")
/** 使用`ActorSystem#actorOf`可以该container内创建新的actors
* `listener` 和 `master` 使我们创建的两个 top level actors */
val listener = system.actorOf(Props[Listener], "listener")
val master = system.actorOf(Props(new Master(listener)), "master")
println("[main] 开始启动Master")
master ! Msg_Start(3)
}
sealed trait Msg
case class Msg_Start(num: Int) extends Msg
case class Msg_Finished(result: mutable.HashMap[Int, String]) extends Msg
case class Msg_Req(index: Int) extends Msg
case class Msg_Resp(index: Int, resp: String) extends Msg
class Master(listener: ActorRef) extends Actor {
val result = new mutable.HashMap[Int, String]()
var numWorkers = 0
def receive = {
/** 收到listener的消息, 开始启动`num`个workers */
case Msg_Start(num) =>
println(s"[master] 收到`Msg_Start`消息,将创建 $num 个workers")
numWorkers = num
/** we create a round-robin router to make it easier to
spread out the work evenly between the workers
在`Master`中再创建actors,这些actors都要被`Master`管理
[http://doc.akka.io/docs/akka/2.0/scala/routing.html#routing-scala] */
val workerRouter = context.actorOf(
Props[Worker].withRouter(RoundRobinRouter(num)), "Worker-Router")
for (i <- 0 until num) {
println(s"[master] 向worker发送消息`Msg_Req($i)`")
workerRouter ! Msg_Req(i)
}
/** 收到 worker 的响应消息 */
case Msg_Resp(index, resp) =>
println(s"[master] 收到`Msg_Resp`消息, index = $index, resp = $resp")
result.put(index, resp)
// 如果收到了全部worker的响应消息,则把最终结果发送给listener
if (result.size == numWorkers) {
println(s"[master] 来自 $numWorkers 个workers的消息接收完毕, 将最终结果发送给listener")
listener ! Msg_Finished(result)
/** stop itself and all its supervised actors (e.g., workers) */
println("[master] 即将关闭master自身,以及master管理的所有workers")
context.stop(self)
}
}
}
class Worker extends Actor {
def receive = {
// 收到Master发来的消息, 处理消息, 然后向Master回应一条消息
case Msg_Req(index) =>
println(s"[worker $index]: 收到来自master的消息")
Thread.sleep(Random.nextInt(10000))
sender ! Msg_Resp(index, s"我是Worker[$index], 现在时间是 ${new Date()}")
}
}
class Listener extends Actor {
def receive = {
case Msg_Finished(result) =>
println("[listener] 收到来自Master的消息")
println(s"[listener] 结果为\n\t" +
s"${result.mkString(", \n\t")}")
println("[listener] 即将关闭Akka System")
context.system.shutdown
}
}
运行结果为:
通过扩展akka.actor.Actor接口并实现其中的receive方法(其中,case statements的类型应该是 PartialFunction[Any, Unit]
),就可以定义自己的Akka Actor class。
receive
方法的返回值是一个partial function object,它将被存储在该actor中作为它的initial behavior。一个actor被创建后,也可以改变它的behavior,参考Become and Unbecome
例子
import akka.actor.{Props, ActorSystem}
import akka.event.Logging
def main(args: Array[String]): Unit = {
val system = ActorSystem("test-akka-actor")
val actor = system.actorOf(Props[MyAkkaActor], "xiaotao-actor")
actor ! "你好"
actor ! true
actor ! 100
system.shutdown
}
class MyAkkaActor extends akka.actor.Actor {
val log = Logging(context.system, this)
def receive = {
case x: String =>
println(s"String -> $x")
case y: Boolean =>
println(s"Boolean -> $y")
case _ =>
println("Unknown msg")
}
}
输出:
使用ActorSystem
将创建top-level actors,这些actors将被由Actor系统提供的Guardian Actor管理。ActorSystem
是一个重量级的对象,一个应用应该仅创建一次。
ActorRef是指向被创建的Actor Instance的句柄(handle),它是可序列化的,并且可以被跨网络传输。
akka.actor.Actor中的receive
方法自身是被循环调用的,而scala.actors.Actor中的receive
方法则只能被调用一次,如果要想它能被循环调用,则要自己如下处理:
def main(args: Array[String]) {
val actor = new MyScalaActor()
actor.start
actor ! "你好"
actor ! true
actor ! 100
}
class MyScalaActor extends scala.actors.Actor {
override def act() = {
while (true) receive { // 必须加上while才能让`receive`被循环调用
case x: String =>
println(s"String -> $x")
case y: Boolean =>
println(s"Boolean -> $y")
case _ =>
println("Unknown msg")
}
}
}
如果收到消息不能被receive处理,那么方法unhandled会被调用,在默认情况下,unhandled会向actor system's event stream发送一条消息:akka.actor.UnhandledMessage(message, sender, recepient)。如果将配置项akka.actor.debug.unhandled 设置为 on 就这可以将该消息转换成实际的debug message。
使用 ActorSystem#actorOf
会创建一个顶层actor,它将被系统提供的guardian actor所监控。 ActorSystem 是一个重量级的对象,在一个app中应该仅创建一个这样的实例。
actorOf
方法返回一个ActorRef
类型的实例,它是可序列化的。
Akka Actor一但被创建,就会被自动地异步启动。
另外,不要在actor中创建另一个actor,那样做很危险!
Atcor
是一个trait,它定义了一个抽象方法receive
,需要我们去实现。除以以外,它还提供了以下方法:
在创建Actor时,可以通过Props来为actor指定一些配置。有三种方式来使用Props:
val prop1: Props = Props[MyAkkaActor]
val prop2: Props = Props(new MyAkkaActor(100, "One")) // 小心这种方式
val prop3: Props = Props(classOf[MyAkkaActor], 100, "One")
其中,第2种方式不应该在某个actor内使用!因为不应该在一个actor内创建新的actor。也不要把某个actor的this引用传给Props。
Ask Pattern (?)
Ask pattern有它的局限性:
- can not receive multiple replies
- can not watch other actors' lifecycle
通过 Inbox 可以解决这些问题。
参考 Akka/Scala: How to monitor the death of an Actor with 'watch'
通过ActorContext的watch
方法,Actor a 可以监视关于 Actor b 的终止事件:当b终止时(通过Stop、PoisonKill、actor context stop self、或者gracefulStop),a将被通知(收到Terminated消息)。
package cn.gridx.scala.akka.tutorial.lifecycle
import akka.actor._
/**
* Created by tao on 3/8/16.
*
* 运行输出为:
向kenny发送命令`PoisonPill`
向jack发送命令`Stop`
向lucy发送命令`Kill`
[Child] Jack Chen will stop itself
[Parent] OMG, kenny挂了
[Parent] OMG, jack挂了
[Parent] OMG, lucy挂了
[ERROR] [03/08/2016 17:00:48.942] [Watch-Actors-Example-akka.actor.default-dispatcher-5] [akka://Watch-Actors-Example/user/tommy/lucy] Kill (akka.actor.ActorKilledException)
*/
object WatchActors {
def main(args: Array[String]): Unit = {
val system = ActorSystem("Watch-Actors-Example")
val parent = system.actorOf(Props[Parent], "tommy")
// 找到各个child actors
val kenny = system.actorSelection("/user/tommy/kenny")
val jack = system.actorSelection("/user/tommy/jack")
val lucy = system.actorSelection("/user/tommy/lucy")
println("向kenny发送命令`PoisonPill`")
kenny ! PoisonPill
println("向jack发送命令`Stop`")
jack ! "Stop"
println("向lucy发送命令`Kill`")
lucy ! Kill
Thread.sleep(5000)
system.shutdown
}
class Parent extends Actor {
// parent生成一个child actor,然后监控它被stop或者kill的事件
val kenny = context.actorOf(Props(classOf[Child], "Kenny Lee"), "kenny")
val jack = context.actorOf(Props(classOf[Child], "Jack Chen"), "jack")
val lucy = context.actorOf(Props(classOf[Child], "Lucy K"), "lucy")
context.watch(kenny)
context.watch(jack)
context.watch(lucy)
def receive = {
case Terminated(actor) =>
println(s"[Parent] OMG, ${actor.path.name}挂了")
case _ =>
println("[Parent] Parent got an unknown message")
}
}
class Child(name: String) extends Actor {
def receive = {
case "Stop" =>
println(s"[Child] $name will stop itself")
context stop self
case PoisonPill =>
println(s"""[Child] $name got a "PoisonPill" message """)
case Kill =>
println(s"""[Child] $name got a "Kill" message """)
case _ =>
println(s"[Child] $name got an unknown message")
}
}
}
值得注意的是,在child actor的receive
方法中,尽管我们试图捕获PosionPill
和Kill
,但是它们并不会被actor捕获。