@xtccc
2016-09-13T01:19:53.000000Z
字数 7457
阅读 2817
Akka
目录:
Actors之间通过消息来进行交互。消息可以是任何数据类型,但是要求immutable。最佳的消息类型是case class,因为它可以很方便地实现pattern matching。
Akka message的基本规则:
消息在进入接收者的mailbox时也是有序的,但是如果mailbox用了non-FIFO的的实现(e.g., PriorityMailbox),那么接收者处理这些消息的顺序可能与消息进入mailbox的顺序不同。
创建actor, 然后向它发送message....
创建actor的过程实际上是parent向child发送一个消息。如果发送的一系列消息中包含了这个initial creation message,并且没有小心地安排这些消息的顺序,那么可能会造成“向一个还未被创建的actor发送消息”的情况。
如果一个actor a创建一个actor b,然后a向b发送一条消息m,则可以保证b首先被创建,然后b才收到消息m。
但是,如果actor a创建一个actor b,然后a将关于b的引用(ActorRef)告诉另一个actor c,接着c向b发送消息,那么可能会发生“c向b发送消息时,b还未被创建”的情况。
When a message is sent to an actor that is terminated before receiving the message, it will be sent as a DeadLetter to the ActorSystem's EventStream.
可以用过以下两种方式来向一个actor发送消息:
!
! 表示“fire-and-forget”,即异步地向actor发送一条消息,然后立即返回。这也被称为tell
?
? 向actor异步地发送一条消息,然后立即返回一个Future。这也被称为ask
ask 与 tell 哪个更好?
There are performance implications of using ask since something needs to keep track of when it times out, there needs to be something that bridges a Promise into an ActorRef and it also needs to be reachable through remoting. So always prefer tell for performance, and only ask if you must.
非阻塞地发送消息
actorRef ! message
如果 actorRef ! message 是在一个actor中被触发的,则sending actor reference也将同消息一起被发送至receiving actor(通过它的 sender(): ActorRef 方法);如果是在一个non-actor instance中被触发的,那么在默认情况下,receiving actor中的sender则将是deadLetters actor。
非阻塞地向actor发送一条消息,然后返回一个Future。后续的代码不会等待,我们可以通过onComplete来设置当这个Future背后的逻辑运行完成并且返回一个结果时,应该触发什么样的操作。
import java.util.Dateimport akka.actor.{Props, ActorSystem, Actor}import akka.util.Timeoutimport akka.pattern.askimport scala.concurrent.ExecutionContext.Implicits.globalimport scala.util.{Random, Failure, Success}import scala.concurrent.Futureimport scala.concurrent.duration._object AskPattern {def main(args: Array[String]): Unit = {val system = ActorSystem("Ask-Pattner-System")val actorA = system.actorOf(Props[ActorA])val actorB = system.actorOf(Props[ActorB])val actorC = system.actorOf(Props[ActorC])/*** 以ask模式来向一个actor发送消息时,需要一个 implicit `akka.util.Timeout`* */implicit val timeout = Timeout(20 seconds)/*** 向三个actor以ask的方式发送消息,并且等待它们的回复消息** 由于f的类型是`Future`,所以在`for`循环中只能用ask而不能用tell* 因为tell不会返回Future** 从运行结果来看, 这几个actor收到消息的顺序是 actorA -> actorB -> actorC* 为什么 ??* */val f: Future[Result] =for {a <- ask(actorA, RequestMsg("你好!")).mapTo[String]b <- (actorB ? RequestMsg("既来之, 则安之.")).mapTo[Int]c <- (actorC ? RequestMsg("再见.")).mapTo[String]} yield Result(a, b, c)/*** 等待Future运行完成后, Success/Failure才能被调用* */f onComplete {case Success(value) =>println(s"[${new Date}] Success - 结果是 ${value}")system.shutdowncase Failure(ex) =>println(s"[${new Date}] Failure - 异常为 $ex")system.shutdown}println(s"[${new Date}] 我还在main中")println(s"[${new Date}] 你们应该会先看到我,再看到各个actors返回的结果")}trait Msgcase class RequestMsg(info: String) extends Msg {}case class Result(a: String, b: Int, c: String) extends Msg {}class ActorA extends Actor {def receive = {case RequestMsg(info) =>println(s"[${new Date}] I'm Actor A ")Thread.sleep(Random.nextInt(6000))sender ! s"[Actor A] $info"}}class ActorB extends Actor {def receive = {case RequestMsg(info) =>println(s"[${new Date}] I'm Actor B ")Thread.sleep(Random.nextInt(6000))sender ! 10086}}class ActorC extends Actor {def receive = {case RequestMsg(info) =>println(s"[${new Date}] I'm Actor C ")Thread.sleep(Random.nextInt(6000))sender ! s"[Actor C] $info"}}}
运行结果为:

package cn.gridx.scala.akka.tutorial.sendMessagesimport java.util.Dateimport akka.actor.{Props, Actor, ActorSystem}import akka.util.Timeoutimport akka.pattern.{ask, pipe}import scala.concurrent.Futureimport scala.util.{Failure, Success, Random}import scala.concurrent.duration._import scala.concurrent.ExecutionContext.Implicits.global/**运行结果为[Mon Mar 07 10:23:48 CST 2016] Actor A : I received a message - [世界是你们的, 也是我们的][Mon Mar 07 10:23:48 CST 2016] 我还在main中[Mon Mar 07 10:23:48 CST 2016] 你们也许会先看到我,再看到各个actors返回的结果[Mon Mar 07 10:23:54 CST 2016] Actor A : Replying to the sender[Mon Mar 07 10:23:54 CST 2016] Actor B : I received a message - [但归根结底还是你们的][Mon Mar 07 10:23:57 CST 2016] Actor B : Replying to the sender[Mon Mar 07 10:23:57 CST 2016] Actor C 成功地收到了消息[Mon Mar 07 10:24:07 CST 2016] Actor C : I received a message - a = [大家好, 我是 Actor A], b = [10010]注意观察最后两行的输出顺序*/object PipeTo {implicit val timeout = Timeout(10 seconds)def main(args: Array[String]): Unit = {val system = ActorSystem("Pipe-To-Example")val actorA = system.actorOf(Props[ActorA])val actorB = system.actorOf(Props[ActorB])val actorC = system.actorOf(Props[ActorC])val f: Future[Result] =for {a <- ask(actorA, RequestMsg("世界是你们的, 也是我们的")).mapTo[String]b <- (actorB ? RequestMsg("但归根结底还是你们的")).mapTo[Int]} yield Result(a, b)/**** 当f代表的计算完成后,将f发送给actorC* 一旦actorC收到了f(而不是等actorC运行完成), 则会触发`onComplete``*/(f pipeTo actorC).onComplete {case Success(value) =>println(s"[${new Date}] Actor C 成功地收到了消息")system.shutdowncase Failure(ex) =>println(s"[${new Date}] Actor C 发生了异常")system.shutdown}println(s"[${new Date}] 我还在main中")println(s"[${new Date}] 你们也许会先看到我,再看到各个actors返回的结果")}trait Msgcase class RequestMsg(resp: String) extends Msg {}case class Result(a: String, b: Int) extends Msg {}class ActorA extends Actor {def receive = {case RequestMsg(req) =>println(s"[${new Date}] Actor A : I received a message - [$req]")Thread.sleep(Random.nextInt(10000))println(s"[${new Date}] Actor A : Replying to the sender")sender ! "大家好, 我是 Actor A"case _ =>println(s"[${new Date}] Actor A : unexpected message received !")}}class ActorB extends Actor {def receive = {case RequestMsg(req) =>println(s"[${new Date}] Actor B : I received a message - [$req]")Thread.sleep(Random.nextInt(10000))println(s"[${new Date}] Actor B : Replying to the sender")sender ! 10010case _ =>println(s"[${new Date}] Actor B : unexpected message received !")}}class ActorC extends Actor {def receive = {case Result(a, b) =>Thread.sleep(10000)println(s"[${new Date}] Actor C : I received a message - a = [$a], b = [$b]")case _ =>println(s"[${new Date}] Actor C : unexpected message received !")}}}
一个Actor只能一次处理一条消息,无法同时处理多条消息。
在Actor中,用于接收消息的方法是receive。它是一个 partial function,类型为PartialFunction[Any, Unit]。
可以为一个actor设置接收消息超时的机制:当该actor被创建并启动后,如果它在一定时间内没有收到任何消息,则Akka System会向它发送一个类型为ReceiveTimeout的超时消息,该actor需要自己捕获该消息并进行处理。
package cn.gridx.scala.akka.tutorial.messagesimport java.util.Dateimport akka.actor.{Props, ActorSystem, ReceiveTimeout, Actor}import scala.concurrent.duration._/*** Created by tao on 3/7/16.输出为[Mon Mar 07 21:36:14 CST 2016] Main: 启动了actor, sleep 10秒 ...[Mon Mar 07 21:36:15 CST 2016] Actor A : #1 ReceiveTimeout exception[Mon Mar 07 21:36:16 CST 2016] Actor A : #2 ReceiveTimeout exception[Mon Mar 07 21:36:17 CST 2016] Actor A : #3 ReceiveTimeout exception[Mon Mar 07 21:36:24 CST 2016] Main: 醒来, 现在shutdown system*/object ReceiveMsg {def main(args: Array[String]): Unit = {/*** 创建并启动一个actor, 但是不向它发送任何消息* */val system = ActorSystem("Receive-Timeout-Example")system.actorOf(Props[ActorA])// 主线程睡眠10秒钟println(s"[${new Date}] Main: 启动了actor, sleep 10秒 ...")Thread.sleep(10000)println(s"[${new Date}] Main: 醒来, 现在shutdown system")// 关闭 actor systemsystem.shutdown}trait Msgcase class RequestMsg(msg: String) {}class ActorA extends Actor {// 计数: 第几次收到`ReceiveTimeout`var i = 1// 为该actor设置超时时间context.setReceiveTimeout(1 seconds)/*** `receive`是一个偏函数,它的类型是PartialFunction[Any, Unit]* Scala中的"match/case clause"就是一个偏函数** 如果该actor在设定的超时时间内没有收到任何消息,* 则系统会自动向它发送一个`ReceiveTimeout`消息* */def receive: PartialFunction[Any, Unit] = {case RequestMsg(msg) =>println(s"[${new Date}] Actor A : received a message [$msg]")sender ! "Hello"case ReceiveTimeout =>println(s"[${new Date}] Actor A : #$i ReceiveTimeout exception")if (i >= 3) // 传入`Duration.Undefined`可以取消接收消息超时的机制context.setReceiveTimeout(Duration.Undefined)elsei += 1}}}
值得注意的是:如果一直没有收到消息,则Akka System会每隔一段时间向该actor发送ReceiveTimeout消息。如果向context.setReceiveTimeout 传入参数 Duration.Undefined则可以取消接收消息超时的机制。