[关闭]
@xtccc 2016-09-13 09:19 字数 7457 阅读 2481

消息

给我写信
GitHub

此处输入图片的描述


Akka


目录:



1. Akka中的消息


Actors之间通过消息来进行交互。消息可以是任何数据类型,但是要求immutable。最佳的消息类型是case class,因为它可以很方便地实现pattern matching。


1.1 基本规则

Akka message的基本规则:


消息在进入接收者的mailbox时也是有序的,但是如果mailbox用了non-FIFO的的实现(e.g., PriorityMailbox),那么接收者处理这些消息的顺序可能与消息进入mailbox的顺序不同。


创建actor, 然后向它发送message....

创建actor的过程实际上是parent向child发送一个消息。如果发送的一系列消息中包含了这个initial creation message,并且没有小心地安排这些消息的顺序,那么可能会造成“向一个还未被创建的actor发送消息”的情况。

如果一个actor a创建一个actor b,然后a向b发送一条消息m,则可以保证b首先被创建,然后b才收到消息m。

但是,如果actor a创建一个actor b,然后a将关于b的引用(ActorRef)告诉另一个actor c,接着c向b发送消息,那么可能会发生“c向b发送消息时,b还未被创建”的情况。


1.2 Dead Letter

When a message is sent to an actor that is terminated before receiving the message, it will be sent as a DeadLetter to the ActorSystem's EventStream.


2. 发送消息


可以用过以下两种方式来向一个actor发送消息:



asktell 哪个更好?

There are performance implications of using ask since something needs to keep track of when it times out, there needs to be something that bridges a Promise into an ActorRef and it also needs to be reachable through remoting. So always prefer tell for performance, and only ask if you must.


2.1 Tell: Fire-and-Forget

非阻塞地发送消息

  1. actorRef ! message

如果 actorRef ! message 是在一个actor中被触发的,则sending actor reference也将同消息一起被发送至receiving actor(通过它的 sender(): ActorRef 方法);如果是在一个non-actor instance中被触发的,那么在默认情况下,receiving actor中的sender则将是deadLetters actor


2.2 Ask: Send-And-Receive-Future

非阻塞地向actor发送一条消息,然后返回一个Future。后续的代码不会等待,我们可以通过onComplete来设置当这个Future背后的逻辑运行完成并且返回一个结果时,应该触发什么样的操作。

  1. import java.util.Date
  2. import akka.actor.{Props, ActorSystem, Actor}
  3. import akka.util.Timeout
  4. import akka.pattern.ask
  5. import scala.concurrent.ExecutionContext.Implicits.global
  6. import scala.util.{Random, Failure, Success}
  7. import scala.concurrent.Future
  8. import scala.concurrent.duration._
  9. object AskPattern {
  10. def main(args: Array[String]): Unit = {
  11. val system = ActorSystem("Ask-Pattner-System")
  12. val actorA = system.actorOf(Props[ActorA])
  13. val actorB = system.actorOf(Props[ActorB])
  14. val actorC = system.actorOf(Props[ActorC])
  15. /**
  16. * 以ask模式来向一个actor发送消息时,需要一个 implicit `akka.util.Timeout`
  17. * */
  18. implicit val timeout = Timeout(20 seconds)
  19. /**
  20. * 向三个actor以ask的方式发送消息,并且等待它们的回复消息
  21. *
  22. * 由于f的类型是`Future`,所以在`for`循环中只能用ask而不能用tell
  23. * 因为tell不会返回Future
  24. *
  25. * 从运行结果来看, 这几个actor收到消息的顺序是 actorA -> actorB -> actorC
  26. * 为什么 ??
  27. * */
  28. val f: Future[Result] =
  29. for {
  30. a <- ask(actorA, RequestMsg("你好!")).mapTo[String]
  31. b <- (actorB ? RequestMsg("既来之, 则安之.")).mapTo[Int]
  32. c <- (actorC ? RequestMsg("再见.")).mapTo[String]
  33. } yield Result(a, b, c)
  34. /**
  35. * 等待Future运行完成后, Success/Failure才能被调用
  36. * */
  37. f onComplete {
  38. case Success(value) =>
  39. println(s"[${new Date}] Success - 结果是 ${value}")
  40. system.shutdown
  41. case Failure(ex) =>
  42. println(s"[${new Date}] Failure - 异常为 $ex")
  43. system.shutdown
  44. }
  45. println(s"[${new Date}] 我还在main中")
  46. println(s"[${new Date}] 你们应该会先看到我,再看到各个actors返回的结果")
  47. }
  48. trait Msg
  49. case class RequestMsg(info: String) extends Msg {}
  50. case class Result(a: String, b: Int, c: String) extends Msg {}
  51. class ActorA extends Actor {
  52. def receive = {
  53. case RequestMsg(info) =>
  54. println(s"[${new Date}] I'm Actor A ")
  55. Thread.sleep(Random.nextInt(6000))
  56. sender ! s"[Actor A] $info"
  57. }
  58. }
  59. class ActorB extends Actor {
  60. def receive = {
  61. case RequestMsg(info) =>
  62. println(s"[${new Date}] I'm Actor B ")
  63. Thread.sleep(Random.nextInt(6000))
  64. sender ! 10086
  65. }
  66. }
  67. class ActorC extends Actor {
  68. def receive = {
  69. case RequestMsg(info) =>
  70. println(s"[${new Date}] I'm Actor C ")
  71. Thread.sleep(Random.nextInt(6000))
  72. sender ! s"[Actor C] $info"
  73. }
  74. }
  75. }

运行结果为:
QQ20160306-1@2x.png-81.9kB


2.3 pipeTo

  1. package cn.gridx.scala.akka.tutorial.sendMessages
  2. import java.util.Date
  3. import akka.actor.{Props, Actor, ActorSystem}
  4. import akka.util.Timeout
  5. import akka.pattern.{ask, pipe}
  6. import scala.concurrent.Future
  7. import scala.util.{Failure, Success, Random}
  8. import scala.concurrent.duration._
  9. import scala.concurrent.ExecutionContext.Implicits.global
  10. /**
  11. 运行结果为
  12. [Mon Mar 07 10:23:48 CST 2016] Actor A : I received a message - [世界是你们的, 也是我们的]
  13. [Mon Mar 07 10:23:48 CST 2016] 我还在main中
  14. [Mon Mar 07 10:23:48 CST 2016] 你们也许会先看到我,再看到各个actors返回的结果
  15. [Mon Mar 07 10:23:54 CST 2016] Actor A : Replying to the sender
  16. [Mon Mar 07 10:23:54 CST 2016] Actor B : I received a message - [但归根结底还是你们的]
  17. [Mon Mar 07 10:23:57 CST 2016] Actor B : Replying to the sender
  18. [Mon Mar 07 10:23:57 CST 2016] Actor C 成功地收到了消息
  19. [Mon Mar 07 10:24:07 CST 2016] Actor C : I received a message - a = [大家好, 我是 Actor A], b = [10010]
  20. 注意观察最后两行的输出顺序
  21. */
  22. object PipeTo {
  23. implicit val timeout = Timeout(10 seconds)
  24. def main(args: Array[String]): Unit = {
  25. val system = ActorSystem("Pipe-To-Example")
  26. val actorA = system.actorOf(Props[ActorA])
  27. val actorB = system.actorOf(Props[ActorB])
  28. val actorC = system.actorOf(Props[ActorC])
  29. val f: Future[Result] =
  30. for {
  31. a <- ask(actorA, RequestMsg("世界是你们的, 也是我们的")).mapTo[String]
  32. b <- (actorB ? RequestMsg("但归根结底还是你们的")).mapTo[Int]
  33. } yield Result(a, b)
  34. /**
  35. *
  36. * 当f代表的计算完成后,将f发送给actorC
  37. * 一旦actorC收到了f(而不是等actorC运行完成), 则会触发`onComplete``
  38. */
  39. (f pipeTo actorC).onComplete {
  40. case Success(value) =>
  41. println(s"[${new Date}] Actor C 成功地收到了消息")
  42. system.shutdown
  43. case Failure(ex) =>
  44. println(s"[${new Date}] Actor C 发生了异常")
  45. system.shutdown
  46. }
  47. println(s"[${new Date}] 我还在main中")
  48. println(s"[${new Date}] 你们也许会先看到我,再看到各个actors返回的结果")
  49. }
  50. trait Msg
  51. case class RequestMsg(resp: String) extends Msg {}
  52. case class Result(a: String, b: Int) extends Msg {}
  53. class ActorA extends Actor {
  54. def receive = {
  55. case RequestMsg(req) =>
  56. println(s"[${new Date}] Actor A : I received a message - [$req]")
  57. Thread.sleep(Random.nextInt(10000))
  58. println(s"[${new Date}] Actor A : Replying to the sender")
  59. sender ! "大家好, 我是 Actor A"
  60. case _ =>
  61. println(s"[${new Date}] Actor A : unexpected message received !")
  62. }
  63. }
  64. class ActorB extends Actor {
  65. def receive = {
  66. case RequestMsg(req) =>
  67. println(s"[${new Date}] Actor B : I received a message - [$req]")
  68. Thread.sleep(Random.nextInt(10000))
  69. println(s"[${new Date}] Actor B : Replying to the sender")
  70. sender ! 10010
  71. case _ =>
  72. println(s"[${new Date}] Actor B : unexpected message received !")
  73. }
  74. }
  75. class ActorC extends Actor {
  76. def receive = {
  77. case Result(a, b) =>
  78. Thread.sleep(10000)
  79. println(s"[${new Date}] Actor C : I received a message - a = [$a], b = [$b]")
  80. case _ =>
  81. println(s"[${new Date}] Actor C : unexpected message received !")
  82. }
  83. }
  84. }




3. 接收消息


一个Actor只能一次处理一条消息,无法同时处理多条消息。


3.1 receive

在Actor中,用于接收消息的方法是receive。它是一个 partial function,类型为PartialFunction[Any, Unit]


3.2 接收消息超时

可以为一个actor设置接收消息超时的机制:当该actor被创建并启动后,如果它在一定时间内没有收到任何消息,则Akka System会向它发送一个类型为ReceiveTimeout的超时消息,该actor需要自己捕获该消息并进行处理。

  1. package cn.gridx.scala.akka.tutorial.messages
  2. import java.util.Date
  3. import akka.actor.{Props, ActorSystem, ReceiveTimeout, Actor}
  4. import scala.concurrent.duration._
  5. /**
  6. * Created by tao on 3/7/16.
  7. 输出为
  8. [Mon Mar 07 21:36:14 CST 2016] Main: 启动了actor, sleep 10秒 ...
  9. [Mon Mar 07 21:36:15 CST 2016] Actor A : #1 ReceiveTimeout exception
  10. [Mon Mar 07 21:36:16 CST 2016] Actor A : #2 ReceiveTimeout exception
  11. [Mon Mar 07 21:36:17 CST 2016] Actor A : #3 ReceiveTimeout exception
  12. [Mon Mar 07 21:36:24 CST 2016] Main: 醒来, 现在shutdown system
  13. */
  14. object ReceiveMsg {
  15. def main(args: Array[String]): Unit = {
  16. /**
  17. * 创建并启动一个actor, 但是不向它发送任何消息
  18. * */
  19. val system = ActorSystem("Receive-Timeout-Example")
  20. system.actorOf(Props[ActorA])
  21. // 主线程睡眠10秒钟
  22. println(s"[${new Date}] Main: 启动了actor, sleep 10秒 ...")
  23. Thread.sleep(10000)
  24. println(s"[${new Date}] Main: 醒来, 现在shutdown system")
  25. // 关闭 actor system
  26. system.shutdown
  27. }
  28. trait Msg
  29. case class RequestMsg(msg: String) {}
  30. class ActorA extends Actor {
  31. // 计数: 第几次收到`ReceiveTimeout`
  32. var i = 1
  33. // 为该actor设置超时时间
  34. context.setReceiveTimeout(1 seconds)
  35. /**
  36. * `receive`是一个偏函数,它的类型是PartialFunction[Any, Unit]
  37. * Scala中的"match/case clause"就是一个偏函数
  38. *
  39. * 如果该actor在设定的超时时间内没有收到任何消息,
  40. * 则系统会自动向它发送一个`ReceiveTimeout`消息
  41. * */
  42. def receive: PartialFunction[Any, Unit] = {
  43. case RequestMsg(msg) =>
  44. println(s"[${new Date}] Actor A : received a message [$msg]")
  45. sender ! "Hello"
  46. case ReceiveTimeout =>
  47. println(s"[${new Date}] Actor A : #$i ReceiveTimeout exception")
  48. if (i >= 3) // 传入`Duration.Undefined`可以取消接收消息超时的机制
  49. context.setReceiveTimeout(Duration.Undefined)
  50. else
  51. i += 1
  52. }
  53. }
  54. }

值得注意的是:如果一直没有收到消息,则Akka System会每隔一段时间向该actor发送ReceiveTimeout消息。如果向context.setReceiveTimeout 传入参数 Duration.Undefined则可以取消接收消息超时的机制。




添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注