@xtccc
2016-09-13 09:19
字数 7457
阅读 2481
Akka
目录:
Actors之间通过消息来进行交互。消息可以是任何数据类型,但是要求immutable。最佳的消息类型是case class,因为它可以很方便地实现pattern matching。
Akka message的基本规则:
消息在进入接收者的mailbox时也是有序的,但是如果mailbox用了non-FIFO的的实现(e.g., PriorityMailbox),那么接收者处理这些消息的顺序可能与消息进入mailbox的顺序不同。
创建actor, 然后向它发送message....
创建actor的过程实际上是parent向child发送一个消息。如果发送的一系列消息中包含了这个initial creation message,并且没有小心地安排这些消息的顺序,那么可能会造成“向一个还未被创建的actor发送消息”的情况。
如果一个actor a创建一个actor b,然后a向b发送一条消息m,则可以保证b首先被创建,然后b才收到消息m。
但是,如果actor a创建一个actor b,然后a将关于b的引用(ActorRef)告诉另一个actor c,接着c向b发送消息,那么可能会发生“c向b发送消息时,b还未被创建”的情况。
When a message is sent to an actor that is terminated before receiving the message, it will be sent as a DeadLetter to the ActorSystem's EventStream.
可以用过以下两种方式来向一个actor发送消息:
!
!
表示“fire-and-forget”,即异步地向actor发送一条消息,然后立即返回。这也被称为tell
?
?
向actor异步地发送一条消息,然后立即返回一个Future
。这也被称为ask
ask 与 tell 哪个更好?
There are performance implications of using ask since something needs to keep track of when it times out, there needs to be something that bridges a Promise into an ActorRef and it also needs to be reachable through remoting. So always prefer tell for performance, and only ask if you must.
非阻塞地发送消息
actorRef ! message
如果 actorRef ! message
是在一个actor中被触发的,则sending actor reference也将同消息一起被发送至receiving actor(通过它的 sender(): ActorRef
方法);如果是在一个non-actor instance中被触发的,那么在默认情况下,receiving actor中的sender则将是deadLetters actor。
非阻塞地向actor发送一条消息,然后返回一个Future。后续的代码不会等待,我们可以通过onComplete
来设置当这个Future背后的逻辑运行完成并且返回一个结果时,应该触发什么样的操作。
import java.util.Date
import akka.actor.{Props, ActorSystem, Actor}
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Random, Failure, Success}
import scala.concurrent.Future
import scala.concurrent.duration._
object AskPattern {
def main(args: Array[String]): Unit = {
val system = ActorSystem("Ask-Pattner-System")
val actorA = system.actorOf(Props[ActorA])
val actorB = system.actorOf(Props[ActorB])
val actorC = system.actorOf(Props[ActorC])
/**
* 以ask模式来向一个actor发送消息时,需要一个 implicit `akka.util.Timeout`
* */
implicit val timeout = Timeout(20 seconds)
/**
* 向三个actor以ask的方式发送消息,并且等待它们的回复消息
*
* 由于f的类型是`Future`,所以在`for`循环中只能用ask而不能用tell
* 因为tell不会返回Future
*
* 从运行结果来看, 这几个actor收到消息的顺序是 actorA -> actorB -> actorC
* 为什么 ??
* */
val f: Future[Result] =
for {
a <- ask(actorA, RequestMsg("你好!")).mapTo[String]
b <- (actorB ? RequestMsg("既来之, 则安之.")).mapTo[Int]
c <- (actorC ? RequestMsg("再见.")).mapTo[String]
} yield Result(a, b, c)
/**
* 等待Future运行完成后, Success/Failure才能被调用
* */
f onComplete {
case Success(value) =>
println(s"[${new Date}] Success - 结果是 ${value}")
system.shutdown
case Failure(ex) =>
println(s"[${new Date}] Failure - 异常为 $ex")
system.shutdown
}
println(s"[${new Date}] 我还在main中")
println(s"[${new Date}] 你们应该会先看到我,再看到各个actors返回的结果")
}
trait Msg
case class RequestMsg(info: String) extends Msg {}
case class Result(a: String, b: Int, c: String) extends Msg {}
class ActorA extends Actor {
def receive = {
case RequestMsg(info) =>
println(s"[${new Date}] I'm Actor A ")
Thread.sleep(Random.nextInt(6000))
sender ! s"[Actor A] $info"
}
}
class ActorB extends Actor {
def receive = {
case RequestMsg(info) =>
println(s"[${new Date}] I'm Actor B ")
Thread.sleep(Random.nextInt(6000))
sender ! 10086
}
}
class ActorC extends Actor {
def receive = {
case RequestMsg(info) =>
println(s"[${new Date}] I'm Actor C ")
Thread.sleep(Random.nextInt(6000))
sender ! s"[Actor C] $info"
}
}
}
运行结果为:
package cn.gridx.scala.akka.tutorial.sendMessages
import java.util.Date
import akka.actor.{Props, Actor, ActorSystem}
import akka.util.Timeout
import akka.pattern.{ask, pipe}
import scala.concurrent.Future
import scala.util.{Failure, Success, Random}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
/**
运行结果为
[Mon Mar 07 10:23:48 CST 2016] Actor A : I received a message - [世界是你们的, 也是我们的]
[Mon Mar 07 10:23:48 CST 2016] 我还在main中
[Mon Mar 07 10:23:48 CST 2016] 你们也许会先看到我,再看到各个actors返回的结果
[Mon Mar 07 10:23:54 CST 2016] Actor A : Replying to the sender
[Mon Mar 07 10:23:54 CST 2016] Actor B : I received a message - [但归根结底还是你们的]
[Mon Mar 07 10:23:57 CST 2016] Actor B : Replying to the sender
[Mon Mar 07 10:23:57 CST 2016] Actor C 成功地收到了消息
[Mon Mar 07 10:24:07 CST 2016] Actor C : I received a message - a = [大家好, 我是 Actor A], b = [10010]
注意观察最后两行的输出顺序
*/
object PipeTo {
implicit val timeout = Timeout(10 seconds)
def main(args: Array[String]): Unit = {
val system = ActorSystem("Pipe-To-Example")
val actorA = system.actorOf(Props[ActorA])
val actorB = system.actorOf(Props[ActorB])
val actorC = system.actorOf(Props[ActorC])
val f: Future[Result] =
for {
a <- ask(actorA, RequestMsg("世界是你们的, 也是我们的")).mapTo[String]
b <- (actorB ? RequestMsg("但归根结底还是你们的")).mapTo[Int]
} yield Result(a, b)
/**
*
* 当f代表的计算完成后,将f发送给actorC
* 一旦actorC收到了f(而不是等actorC运行完成), 则会触发`onComplete``
*/
(f pipeTo actorC).onComplete {
case Success(value) =>
println(s"[${new Date}] Actor C 成功地收到了消息")
system.shutdown
case Failure(ex) =>
println(s"[${new Date}] Actor C 发生了异常")
system.shutdown
}
println(s"[${new Date}] 我还在main中")
println(s"[${new Date}] 你们也许会先看到我,再看到各个actors返回的结果")
}
trait Msg
case class RequestMsg(resp: String) extends Msg {}
case class Result(a: String, b: Int) extends Msg {}
class ActorA extends Actor {
def receive = {
case RequestMsg(req) =>
println(s"[${new Date}] Actor A : I received a message - [$req]")
Thread.sleep(Random.nextInt(10000))
println(s"[${new Date}] Actor A : Replying to the sender")
sender ! "大家好, 我是 Actor A"
case _ =>
println(s"[${new Date}] Actor A : unexpected message received !")
}
}
class ActorB extends Actor {
def receive = {
case RequestMsg(req) =>
println(s"[${new Date}] Actor B : I received a message - [$req]")
Thread.sleep(Random.nextInt(10000))
println(s"[${new Date}] Actor B : Replying to the sender")
sender ! 10010
case _ =>
println(s"[${new Date}] Actor B : unexpected message received !")
}
}
class ActorC extends Actor {
def receive = {
case Result(a, b) =>
Thread.sleep(10000)
println(s"[${new Date}] Actor C : I received a message - a = [$a], b = [$b]")
case _ =>
println(s"[${new Date}] Actor C : unexpected message received !")
}
}
}
一个Actor只能一次处理一条消息,无法同时处理多条消息。
在Actor中,用于接收消息的方法是receive
。它是一个 partial function,类型为PartialFunction[Any, Unit]
。
可以为一个actor设置接收消息超时的机制:当该actor被创建并启动后,如果它在一定时间内没有收到任何消息,则Akka System会向它发送一个类型为ReceiveTimeout
的超时消息,该actor需要自己捕获该消息并进行处理。
package cn.gridx.scala.akka.tutorial.messages
import java.util.Date
import akka.actor.{Props, ActorSystem, ReceiveTimeout, Actor}
import scala.concurrent.duration._
/**
* Created by tao on 3/7/16.
输出为
[Mon Mar 07 21:36:14 CST 2016] Main: 启动了actor, sleep 10秒 ...
[Mon Mar 07 21:36:15 CST 2016] Actor A : #1 ReceiveTimeout exception
[Mon Mar 07 21:36:16 CST 2016] Actor A : #2 ReceiveTimeout exception
[Mon Mar 07 21:36:17 CST 2016] Actor A : #3 ReceiveTimeout exception
[Mon Mar 07 21:36:24 CST 2016] Main: 醒来, 现在shutdown system
*/
object ReceiveMsg {
def main(args: Array[String]): Unit = {
/**
* 创建并启动一个actor, 但是不向它发送任何消息
* */
val system = ActorSystem("Receive-Timeout-Example")
system.actorOf(Props[ActorA])
// 主线程睡眠10秒钟
println(s"[${new Date}] Main: 启动了actor, sleep 10秒 ...")
Thread.sleep(10000)
println(s"[${new Date}] Main: 醒来, 现在shutdown system")
// 关闭 actor system
system.shutdown
}
trait Msg
case class RequestMsg(msg: String) {}
class ActorA extends Actor {
// 计数: 第几次收到`ReceiveTimeout`
var i = 1
// 为该actor设置超时时间
context.setReceiveTimeout(1 seconds)
/**
* `receive`是一个偏函数,它的类型是PartialFunction[Any, Unit]
* Scala中的"match/case clause"就是一个偏函数
*
* 如果该actor在设定的超时时间内没有收到任何消息,
* 则系统会自动向它发送一个`ReceiveTimeout`消息
* */
def receive: PartialFunction[Any, Unit] = {
case RequestMsg(msg) =>
println(s"[${new Date}] Actor A : received a message [$msg]")
sender ! "Hello"
case ReceiveTimeout =>
println(s"[${new Date}] Actor A : #$i ReceiveTimeout exception")
if (i >= 3) // 传入`Duration.Undefined`可以取消接收消息超时的机制
context.setReceiveTimeout(Duration.Undefined)
else
i += 1
}
}
}
值得注意的是:如果一直没有收到消息,则Akka System会每隔一段时间向该actor发送ReceiveTimeout
消息。如果向context.setReceiveTimeout
传入参数 Duration.Undefined
则可以取消接收消息超时的机制。