1. Akka中的消息

Actors之间通过消息来进行交互。消息可以是任何数据类型,但是要求immutable。最佳的消息类型是case class,因为它可以很方便地实现pattern matching。

1.1 基本规则

Akka message的基本规则:

消息在进入接收者的mailbox时也是有序的,但是如果mailbox用了non-FIFO的的实现(e.g., PriorityMailbox),那么接收者处理这些消息的顺序可能与消息进入mailbox的顺序不同。

创建actor, 然后向它发送message....

创建actor的过程实际上是parent向child发送一个消息。如果发送的一系列消息中包含了这个initial creation message,并且没有小心地安排这些消息的顺序,那么可能会造成“向一个还未被创建的actor发送消息”的情况。

如果一个actor a创建一个actor b,然后a向b发送一条消息m,则可以保证b首先被创建,然后b才收到消息m。

但是,如果actor a创建一个actor b,然后a将关于b的引用(ActorRef)告诉另一个actor c,接着c向b发送消息,那么可能会发生“c向b发送消息时,b还未被创建”的情况。

1.2 Dead Letter

When a message is sent to an actor that is terminated before receiving the message, it will be sent as a DeadLetter to the ActorSystem's EventStream.

2. 发送消息


asktell 哪个更好?

There are performance implications of using ask since something needs to keep track of when it times out, there needs to be something that bridges a Promise into an ActorRef and it also needs to be reachable through remoting. So always prefer tell for performance, and only ask if you must.

2.1 Tell: Fire-and-Forget


  1. actorRef ! message

如果 actorRef ! message 是在一个actor中被触发的,则sending actor reference也将同消息一起被发送至receiving actor(通过它的 sender(): ActorRef 方法);如果是在一个non-actor instance中被触发的,那么在默认情况下,receiving actor中的sender则将是deadLetters actor

2.2 Ask: Send-And-Receive-Future


  1. import java.util.Date
  2. import akka.actor.{Props, ActorSystem, Actor}
  3. import akka.util.Timeout
  4. import akka.pattern.ask
  5. import scala.concurrent.ExecutionContext.Implicits.global
  6. import scala.util.{Random, Failure, Success}
  7. import scala.concurrent.Future
  8. import scala.concurrent.duration._
  9. object AskPattern {
  10. def main(args: Array[String]): Unit = {
  11. val system = ActorSystem("Ask-Pattner-System")
  12. val actorA = system.actorOf(Props[ActorA])
  13. val actorB = system.actorOf(Props[ActorB])
  14. val actorC = system.actorOf(Props[ActorC])
  15. /**
  16. * 以ask模式来向一个actor发送消息时,需要一个 implicit `akka.util.Timeout`
  17. * */
  18. implicit val timeout = Timeout(20 seconds)
  19. /**
  20. * 向三个actor以ask的方式发送消息,并且等待它们的回复消息
  21. *
  22. * 由于f的类型是`Future`,所以在`for`循环中只能用ask而不能用tell
  23. * 因为tell不会返回Future
  24. *
  25. * 从运行结果来看, 这几个actor收到消息的顺序是 actorA -> actorB -> actorC
  26. * 为什么 ??
  27. * */
  28. val f: Future[Result] =
  29. for {
  30. a <- ask(actorA, RequestMsg("你好!")).mapTo[String]
  31. b <- (actorB ? RequestMsg("既来之, 则安之.")).mapTo[Int]
  32. c <- (actorC ? RequestMsg("再见.")).mapTo[String]
  33. } yield Result(a, b, c)
  34. /**
  35. * 等待Future运行完成后, Success/Failure才能被调用
  36. * */
  37. f onComplete {
  38. case Success(value) =>
  39. println(s"[${new Date}] Success - 结果是 ${value}")
  40. system.shutdown
  41. case Failure(ex) =>
  42. println(s"[${new Date}] Failure - 异常为 $ex")
  43. system.shutdown
  44. }
  45. println(s"[${new Date}] 我还在main中")
  46. println(s"[${new Date}] 你们应该会先看到我,再看到各个actors返回的结果")
  47. }
  48. trait Msg
  49. case class RequestMsg(info: String) extends Msg {}
  50. case class Result(a: String, b: Int, c: String) extends Msg {}
  51. class ActorA extends Actor {
  52. def receive = {
  53. case RequestMsg(info) =>
  54. println(s"[${new Date}] I'm Actor A ")
  55. Thread.sleep(Random.nextInt(6000))
  56. sender ! s"[Actor A] $info"
  57. }
  58. }
  59. class ActorB extends Actor {
  60. def receive = {
  61. case RequestMsg(info) =>
  62. println(s"[${new Date}] I'm Actor B ")
  63. Thread.sleep(Random.nextInt(6000))
  64. sender ! 10086
  65. }
  66. }
  67. class ActorC extends Actor {
  68. def receive = {
  69. case RequestMsg(info) =>
  70. println(s"[${new Date}] I'm Actor C ")
  71. Thread.sleep(Random.nextInt(6000))
  72. sender ! s"[Actor C] $info"
  73. }
  74. }
  75. }


2.3 pipeTo

  1. package cn.gridx.scala.akka.tutorial.sendMessages
  2. import java.util.Date
  3. import akka.actor.{Props, Actor, ActorSystem}
  4. import akka.util.Timeout
  5. import akka.pattern.{ask, pipe}
  6. import scala.concurrent.Future
  7. import scala.util.{Failure, Success, Random}
  8. import scala.concurrent.duration._
  9. import scala.concurrent.ExecutionContext.Implicits.global
  10. /**
  11. 运行结果为
  12. [Mon Mar 07 10:23:48 CST 2016] Actor A : I received a message - [世界是你们的, 也是我们的]
  13. [Mon Mar 07 10:23:48 CST 2016] 我还在main中
  14. [Mon Mar 07 10:23:48 CST 2016] 你们也许会先看到我,再看到各个actors返回的结果
  15. [Mon Mar 07 10:23:54 CST 2016] Actor A : Replying to the sender
  16. [Mon Mar 07 10:23:54 CST 2016] Actor B : I received a message - [但归根结底还是你们的]
  17. [Mon Mar 07 10:23:57 CST 2016] Actor B : Replying to the sender
  18. [Mon Mar 07 10:23:57 CST 2016] Actor C 成功地收到了消息
  19. [Mon Mar 07 10:24:07 CST 2016] Actor C : I received a message - a = [大家好, 我是 Actor A], b = [10010]
  20. 注意观察最后两行的输出顺序
  21. */
  22. object PipeTo {
  23. implicit val timeout = Timeout(10 seconds)
  24. def main(args: Array[String]): Unit = {
  25. val system = ActorSystem("Pipe-To-Example")
  26. val actorA = system.actorOf(Props[ActorA])
  27. val actorB = system.actorOf(Props[ActorB])
  28. val actorC = system.actorOf(Props[ActorC])
  29. val f: Future[Result] =
  30. for {
  31. a <- ask(actorA, RequestMsg("世界是你们的, 也是我们的")).mapTo[String]
  32. b <- (actorB ? RequestMsg("但归根结底还是你们的")).mapTo[Int]
  33. } yield Result(a, b)
  34. /**
  35. *
  36. * 当f代表的计算完成后,将f发送给actorC
  37. * 一旦actorC收到了f(而不是等actorC运行完成), 则会触发`onComplete``
  38. */
  39. (f pipeTo actorC).onComplete {
  40. case Success(value) =>
  41. println(s"[${new Date}] Actor C 成功地收到了消息")
  42. system.shutdown
  43. case Failure(ex) =>
  44. println(s"[${new Date}] Actor C 发生了异常")
  45. system.shutdown
  46. }
  47. println(s"[${new Date}] 我还在main中")
  48. println(s"[${new Date}] 你们也许会先看到我,再看到各个actors返回的结果")
  49. }
  50. trait Msg
  51. case class RequestMsg(resp: String) extends Msg {}
  52. case class Result(a: String, b: Int) extends Msg {}
  53. class ActorA extends Actor {
  54. def receive = {
  55. case RequestMsg(req) =>
  56. println(s"[${new Date}] Actor A : I received a message - [$req]")
  57. Thread.sleep(Random.nextInt(10000))
  58. println(s"[${new Date}] Actor A : Replying to the sender")
  59. sender ! "大家好, 我是 Actor A"
  60. case _ =>
  61. println(s"[${new Date}] Actor A : unexpected message received !")
  62. }
  63. }
  64. class ActorB extends Actor {
  65. def receive = {
  66. case RequestMsg(req) =>
  67. println(s"[${new Date}] Actor B : I received a message - [$req]")
  68. Thread.sleep(Random.nextInt(10000))
  69. println(s"[${new Date}] Actor B : Replying to the sender")
  70. sender ! 10010
  71. case _ =>
  72. println(s"[${new Date}] Actor B : unexpected message received !")
  73. }
  74. }
  75. class ActorC extends Actor {
  76. def receive = {
  77. case Result(a, b) =>
  78. Thread.sleep(10000)
  79. println(s"[${new Date}] Actor C : I received a message - a = [$a], b = [$b]")
  80. case _ =>
  81. println(s"[${new Date}] Actor C : unexpected message received !")
  82. }
  83. }
  84. }

3. 接收消息


3.1 receive

在Actor中,用于接收消息的方法是receive。它是一个 partial function,类型为PartialFunction[Any, Unit]

3.2 接收消息超时

可以为一个actor设置接收消息超时的机制:当该actor被创建并启动后,如果它在一定时间内没有收到任何消息,则Akka System会向它发送一个类型为ReceiveTimeout的超时消息,该actor需要自己捕获该消息并进行处理。

  1. package cn.gridx.scala.akka.tutorial.messages
  2. import java.util.Date
  3. import akka.actor.{Props, ActorSystem, ReceiveTimeout, Actor}
  4. import scala.concurrent.duration._
  5. /**
  6. * Created by tao on 3/7/16.
  7. 输出为
  8. [Mon Mar 07 21:36:14 CST 2016] Main: 启动了actor, sleep 10秒 ...
  9. [Mon Mar 07 21:36:15 CST 2016] Actor A : #1 ReceiveTimeout exception
  10. [Mon Mar 07 21:36:16 CST 2016] Actor A : #2 ReceiveTimeout exception
  11. [Mon Mar 07 21:36:17 CST 2016] Actor A : #3 ReceiveTimeout exception
  12. [Mon Mar 07 21:36:24 CST 2016] Main: 醒来, 现在shutdown system
  13. */
  14. object ReceiveMsg {
  15. def main(args: Array[String]): Unit = {
  16. /**
  17. * 创建并启动一个actor, 但是不向它发送任何消息
  18. * */
  19. val system = ActorSystem("Receive-Timeout-Example")
  20. system.actorOf(Props[ActorA])
  21. // 主线程睡眠10秒钟
  22. println(s"[${new Date}] Main: 启动了actor, sleep 10秒 ...")
  23. Thread.sleep(10000)
  24. println(s"[${new Date}] Main: 醒来, 现在shutdown system")
  25. // 关闭 actor system
  26. system.shutdown
  27. }
  28. trait Msg
  29. case class RequestMsg(msg: String) {}
  30. class ActorA extends Actor {
  31. // 计数: 第几次收到`ReceiveTimeout`
  32. var i = 1
  33. // 为该actor设置超时时间
  34. context.setReceiveTimeout(1 seconds)
  35. /**
  36. * `receive`是一个偏函数,它的类型是PartialFunction[Any, Unit]
  37. * Scala中的"match/case clause"就是一个偏函数
  38. *
  39. * 如果该actor在设定的超时时间内没有收到任何消息,
  40. * 则系统会自动向它发送一个`ReceiveTimeout`消息
  41. * */
  42. def receive: PartialFunction[Any, Unit] = {
  43. case RequestMsg(msg) =>
  44. println(s"[${new Date}] Actor A : received a message [$msg]")
  45. sender ! "Hello"
  46. case ReceiveTimeout =>
  47. println(s"[${new Date}] Actor A : #$i ReceiveTimeout exception")
  48. if (i >= 3) // 传入`Duration.Undefined`可以取消接收消息超时的机制
  49. context.setReceiveTimeout(Duration.Undefined)
  50. else
  51. i += 1
  52. }
  53. }
  54. }

值得注意的是:如果一直没有收到消息,则Akka System会每隔一段时间向该actor发送ReceiveTimeout消息。如果向context.setReceiveTimeout 传入参数 Duration.Undefined则可以取消接收消息超时的机制。
