[关闭]
@xtccc 2016-07-22T15:17:43.000000Z 字数 11084 阅读 3845

Akka 第一课

给我写信
GitHub

此处输入图片的描述

Akka


参考链接:


目录


1. Akka简介


1.0 Akka and Actors

Akka’s approach to handling concurrency is based on the Actor Model.

Akka creates a layer between the actors and the underlying system such that actors simply need to process messages.


1.1 Akka Modules

Akka是高度模块化的,包含以下的JAR包:


1.2 关于Actor

What is Actor?

An actor is a container for State, Behavior, a Mailbox, Children Actors, and a Supervisor Strategy. All of these are encapsulated behind an Actor Reference. Actor有明确的生命周期,当不需要再使用它们时,我们应该显式地将其销毁。

Actors can only be created by other actors —— where the top-level actor is provided by the system —— and each created actor is supervised by its parent.


Actor Reference

当我们要与某个actor交互时,我们只会通过actor reference与之交互,而不会直接操作actor本身。

将Actor与Actor Reference分离会带来好处,例如:重启actor时不需要更新actor reference。


Actor and Thread

Akka will run sets of actors on sets of threads, where typically many actors share one thread, and subsequent invocatins of one actor may end up being processed on different threads.



State

When the actor fails and is restarted by its supervisor, the state will be created from scratch, like upon first creating the actor.

Optionally, an actor's state can be automatically recovered to the state before a restart by persisting the received messages and replaying them after the restart. See Persistence.



Mailbox

包括 FIFO Mailbox, Priority Mailbox

一条消息被sender发出,由另一个receiver接收,receiver actor的mailbox的作用就是连接sender与receiver。



Children

An actor can create children for delegating sub-tasks, and it will automatically supervise them. The list of children is maintained within the actor's context and the actor has access to it.

Modification to the list is done by creating (context.actorOf(···)) or stopping (context.stop(child)) children. The actual creation and termination actions happen behind the scene in asynchronous way so that they do not block the supervisor.


1.3 Supervision, Fault Tolerance and Monitoring

The final piece of an actor is its strategy for handling faults of its children. Fault handling is then done transparently by Akka, applying one of the strategies described in Supervision and Monitoring for each incoming failure. As this strategy is fundamental to how an actor system is structured, it cannot be changed once an actor has been created.

Once an actor terminates, i.e. fails in a way which is not handled by a restart, stops itself or is stopped by its supervisor, it will free up its resources, draining all remaining messages from its mailbox into the system’s dead letter mailbox which will forward them to the EventStream as DeadLetters. The mailbox is then replaced within the actor reference with a system mailbox, redirecting all new messages to the EventStream as DeadLetters. This is done on a best effort basis, though, so do not rely on it in order to construct “guaranteed delivery”.

在Akka系统中,每一个actor都是它自己的children的supervisor。如果某个actor a 无法处理某条消息,那么 a 会将自身及其children实施挂起(suspend),并向 a 的supervisor发送一条消息(通常是exception的形式)。

In Akka, the way in which a supervisor reacts to and handles exceptions that percolates up to it from its children is referred to as a supervisor strgegy. Supervisor strategies are the primary and straightforward mechanism by which you define the fault tolerant behavior of your system.

当一个supervisor actor收到了一条代表失败的消息时,它可以有以下选择:

  • Resume the child (and its children), keeping its accumulated internal state.
  • Restart the child (and its children), cleaing out its accumulated internal state.
  • Stop the child (and its children) permanently.
  • Escalate the failure, thereby failing itself.


1.4 Actor Path and Address

参考 AKKA: Actor References, Paths and Addresses

ActorPath.png-42.3kB

Actor Path

An actor path consists of an anchor, which identifies the actor system, followed by the concatenation of the path elements, from root guardian to the designated actor; the path elements are the names of the traversed actors and are separated by slashes.


Actor Path Anchors

Each actor path has an address component, describing the protocol and location by which the corresponding actor is reachable, followed by the names of the actors in the hierarchy from the root up.

Examples:
akka://my-sys/user/service-a/worker1 // purely local
akka.tcp://my-sys@host.example.com:5678/user/service-b // remote


Actors的逻辑路径与物理路径





2. 一个最简单的Akka系统

我们将创建一个简单地Akka系统,实现如下功能:

系统中有1个Master和1个Listener。用户向Master发送开始的指令,Master收到后启动5个Workers并向它们每人发送一条消息。每个Worker收到消息后,向Master回应一条新的消息。Master集齐了5个Worker的消息后,就这些消息集中起来并发送给Listener。Listener收到消息后将它打印出来。
QQ20160226-1@2x.png-1558.8kB


  1. import java.util.Date
  2. import akka.actor.{ActorSystem, ActorRef, Props, Actor}
  3. import akka.routing.RoundRobinRouter
  4. import scala.collection.mutable
  5. import scala.util.Random
  6. def main(args: Array[String]) {
  7. println("[main] 创建简单的Akka系统")
  8. /** `ActorSystem` 是一个Akka
  9. container,包含在该context中创建的全部actors
  10. `ActorSystem` is a heavy object: create only one per application */
  11. val system = ActorSystem("Simple-Akka-System")
  12. /** 使用`ActorSystem#actorOf`可以该container内创建新的actors
  13. * `listener` 和 `master` 使我们创建的两个 top level actors */
  14. val listener = system.actorOf(Props[Listener], "listener")
  15. val master = system.actorOf(Props(new Master(listener)), "master")
  16. println("[main] 开始启动Master")
  17. master ! Msg_Start(3)
  18. }
  19. sealed trait Msg
  20. case class Msg_Start(num: Int) extends Msg
  21. case class Msg_Finished(result: mutable.HashMap[Int, String]) extends Msg
  22. case class Msg_Req(index: Int) extends Msg
  23. case class Msg_Resp(index: Int, resp: String) extends Msg
  24. class Master(listener: ActorRef) extends Actor {
  25. val result = new mutable.HashMap[Int, String]()
  26. var numWorkers = 0
  27. def receive = {
  28. /** 收到listener的消息, 开始启动`num`个workers */
  29. case Msg_Start(num) =>
  30. println(s"[master] 收到`Msg_Start`消息,将创建 $num 个workers")
  31. numWorkers = num
  32. /** we create a round-robin router to make it easier to
  33. spread out the work evenly between the workers
  34. 在`Master`中再创建actors,这些actors都要被`Master`管理
  35. [http://doc.akka.io/docs/akka/2.0/scala/routing.html#routing-scala] */
  36. val workerRouter = context.actorOf(
  37. Props[Worker].withRouter(RoundRobinRouter(num)), "Worker-Router")
  38. for (i <- 0 until num) {
  39. println(s"[master] 向worker发送消息`Msg_Req($i)`")
  40. workerRouter ! Msg_Req(i)
  41. }
  42. /** 收到 worker 的响应消息 */
  43. case Msg_Resp(index, resp) =>
  44. println(s"[master] 收到`Msg_Resp`消息, index = $index, resp = $resp")
  45. result.put(index, resp)
  46. // 如果收到了全部worker的响应消息,则把最终结果发送给listener
  47. if (result.size == numWorkers) {
  48. println(s"[master] 来自 $numWorkers 个workers的消息接收完毕, 将最终结果发送给listener")
  49. listener ! Msg_Finished(result)
  50. /** stop itself and all its supervised actors (e.g., workers) */
  51. println("[master] 即将关闭master自身,以及master管理的所有workers")
  52. context.stop(self)
  53. }
  54. }
  55. }
  56. class Worker extends Actor {
  57. def receive = {
  58. // 收到Master发来的消息, 处理消息, 然后向Master回应一条消息
  59. case Msg_Req(index) =>
  60. println(s"[worker $index]: 收到来自master的消息")
  61. Thread.sleep(Random.nextInt(10000))
  62. sender ! Msg_Resp(index, s"我是Worker[$index], 现在时间是 ${new Date()}")
  63. }
  64. }
  65. class Listener extends Actor {
  66. def receive = {
  67. case Msg_Finished(result) =>
  68. println("[listener] 收到来自Master的消息")
  69. println(s"[listener] 结果为\n\t" +
  70. s"${result.mkString(", \n\t")}")
  71. println("[listener] 即将关闭Akka System")
  72. context.system.shutdown
  73. }
  74. }

运行结果为:
QQ20160226-2@2x.png-273.9kB




3. 细节讲解


3.1 创建Akka Actor

通过扩展akka.actor.Actor接口并实现其中的receive方法(其中,case statements的类型应该是 PartialFunction[Any, Unit]),就可以定义自己的Akka Actor class。

receive方法的返回值是一个partial function object,它将被存储在该actor中作为它的initial behavior。一个actor被创建后,也可以改变它的behavior,参考Become and Unbecome

例子

  1. import akka.actor.{Props, ActorSystem}
  2. import akka.event.Logging
  3. def main(args: Array[String]): Unit = {
  4. val system = ActorSystem("test-akka-actor")
  5. val actor = system.actorOf(Props[MyAkkaActor], "xiaotao-actor")
  6. actor ! "你好"
  7. actor ! true
  8. actor ! 100
  9. system.shutdown
  10. }
  11. class MyAkkaActor extends akka.actor.Actor {
  12. val log = Logging(context.system, this)
  13. def receive = {
  14. case x: String =>
  15. println(s"String -> $x")
  16. case y: Boolean =>
  17. println(s"Boolean -> $y")
  18. case _ =>
  19. println("Unknown msg")
  20. }
  21. }

输出:
QQ20160226-3@2x.png-16.7kB


使用ActorSystem将创建top-level actors,这些actors将被由Actor系统提供的Guardian Actor管理。ActorSystem是一个重量级的对象,一个应用应该仅创建一次。

ActorRef是指向被创建的Actor Instance的句柄(handle),它是可序列化的,并且可以被跨网络传输。


3.2 receive方法

akka.actor.Actor中的receive方法自身是被循环调用的,而scala.actors.Actor中的receive方法则只能被调用一次,如果要想它能被循环调用,则要自己如下处理:

  1. def main(args: Array[String]) {
  2. val actor = new MyScalaActor()
  3. actor.start
  4. actor ! "你好"
  5. actor ! true
  6. actor ! 100
  7. }
  8. class MyScalaActor extends scala.actors.Actor {
  9. override def act() = {
  10. while (true) receive { // 必须加上while才能让`receive`被循环调用
  11. case x: String =>
  12. println(s"String -> $x")
  13. case y: Boolean =>
  14. println(s"Boolean -> $y")
  15. case _ =>
  16. println("Unknown msg")
  17. }
  18. }
  19. }

如果收到消息不能被receive处理,那么方法unhandled会被调用,在默认情况下,unhandled会向actor system's event stream发送一条消息:akka.actor.UnhandledMessage(message, sender, recepient)。如果将配置项akka.actor.debug.unhandled 设置为 on 就这可以将该消息转换成实际的debug message。


3.3 ActorSystem#actorOf

使用 ActorSystem#actorOf 会创建一个顶层actor,它将被系统提供的guardian actor所监控。 ActorSystem 是一个重量级的对象,在一个app中应该仅创建一个这样的实例。

actorOf方法返回一个ActorRef类型的实例,它是可序列化的。

Akka Actor一但被创建,就会被自动地异步启动。

另外,不要在actor中创建另一个actor,那样做很危险!

3.4 Actor Trait

Atcor是一个trait,它定义了一个抽象方法receive,需要我们去实现。除以以外,它还提供了以下方法:


3.5 Props

在创建Actor时,可以通过Props来为actor指定一些配置。有三种方式来使用Props:

  1. val prop1: Props = Props[MyAkkaActor]
  2. val prop2: Props = Props(new MyAkkaActor(100, "One")) // 小心这种方式
  3. val prop3: Props = Props(classOf[MyAkkaActor], 100, "One")

其中,第2种方式不应该在某个actor内使用!因为不应该在一个actor内创建新的actor。也不要把某个actor的this引用传给Props


3.4 Tell (!) and Ask (?) Patterns

Ask Pattern (?)
Ask pattern有它的局限性:

  1. can not receive multiple replies
  2. can not watch other actors' lifecycle

通过 Inbox 可以解决这些问题。




4. Watch


参考 Akka/Scala: How to monitor the death of an Actor with 'watch'

通过ActorContext的watch方法,Actor a 可以监视关于 Actor b 的终止事件:当b终止时(通过StopPoisonKillactor context stop self、或者gracefulStop),a将被通知(收到Terminated消息)。

  1. package cn.gridx.scala.akka.tutorial.lifecycle
  2. import akka.actor._
  3. /**
  4. * Created by tao on 3/8/16.
  5. *
  6. * 运行输出为:
  7. 向kenny发送命令`PoisonPill`
  8. 向jack发送命令`Stop`
  9. 向lucy发送命令`Kill`
  10. [Child] Jack Chen will stop itself
  11. [Parent] OMG, kenny挂了
  12. [Parent] OMG, jack挂了
  13. [Parent] OMG, lucy挂了
  14. [ERROR] [03/08/2016 17:00:48.942] [Watch-Actors-Example-akka.actor.default-dispatcher-5] [akka://Watch-Actors-Example/user/tommy/lucy] Kill (akka.actor.ActorKilledException)
  15. */
  16. object WatchActors {
  17. def main(args: Array[String]): Unit = {
  18. val system = ActorSystem("Watch-Actors-Example")
  19. val parent = system.actorOf(Props[Parent], "tommy")
  20. // 找到各个child actors
  21. val kenny = system.actorSelection("/user/tommy/kenny")
  22. val jack = system.actorSelection("/user/tommy/jack")
  23. val lucy = system.actorSelection("/user/tommy/lucy")
  24. println("向kenny发送命令`PoisonPill`")
  25. kenny ! PoisonPill
  26. println("向jack发送命令`Stop`")
  27. jack ! "Stop"
  28. println("向lucy发送命令`Kill`")
  29. lucy ! Kill
  30. Thread.sleep(5000)
  31. system.shutdown
  32. }
  33. class Parent extends Actor {
  34. // parent生成一个child actor,然后监控它被stop或者kill的事件
  35. val kenny = context.actorOf(Props(classOf[Child], "Kenny Lee"), "kenny")
  36. val jack = context.actorOf(Props(classOf[Child], "Jack Chen"), "jack")
  37. val lucy = context.actorOf(Props(classOf[Child], "Lucy K"), "lucy")
  38. context.watch(kenny)
  39. context.watch(jack)
  40. context.watch(lucy)
  41. def receive = {
  42. case Terminated(actor) =>
  43. println(s"[Parent] OMG, ${actor.path.name}挂了")
  44. case _ =>
  45. println("[Parent] Parent got an unknown message")
  46. }
  47. }
  48. class Child(name: String) extends Actor {
  49. def receive = {
  50. case "Stop" =>
  51. println(s"[Child] $name will stop itself")
  52. context stop self
  53. case PoisonPill =>
  54. println(s"""[Child] $name got a "PoisonPill" message """)
  55. case Kill =>
  56. println(s"""[Child] $name got a "Kill" message """)
  57. case _ =>
  58. println(s"[Child] $name got an unknown message")
  59. }
  60. }
  61. }

值得注意的是,在child actor的receive方法中,尽管我们试图捕获PosionPillKill,但是它们并不会被actor捕获。




添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注