@MiloXia
2015-09-09T10:12:10.000000Z
字数 16022
阅读 13350
akka
容错:不是抓住系统所有的错误并恢复,而是将错误(崩溃)孤立出来,不会导致整个系统崩溃(隔离故障组件),备份组件可以替换崩溃组件(冗余)(可恢复性)
容错方式:Restart, Resume, Stop, Escalate
let it crash原则
崩溃原因:网络,第三方服务,硬件故障
Akka容错:分离业务逻辑(receive)和容错逻辑(supervisorStrategy)
父actor自动成为子actor的supervisor
supervisor不fix子actor,而是简单的呈现如何恢复的一个判断==>
List(Restart, //重启并替换原actor,mailbox消息可继续发送,
//但是接收会暂停至替换完成,重启默认重启所有子actor
Resume, //同一个actor不重启,忽略崩溃,继续处理下一个消息
Stop, //terminated 不再处理任何消息,剩余消息会进入死信信箱
Escalate//交给上层处理
)
Akka提供两种恢复策略:
OneForOneStrategy: 只针对单独actor
override def supervisorStrategy = OneForOneStrategy() {
case _:XXXException => Restart
}
AllForOneStrategy: 对所有节点,一个节点重启,其它节点全部重启
不处理TheadDeath OOM等JVM Error,一直往上抛直到actorSystem顶层actor user guardian 会shutdown ,可通过配置使其优雅shutdown
scaling up 垂直扩展:某节点可以在单机(单JVM)上运行更多的数量
scaling out 水平扩展:某节点可以在多台机器上运行
Akka 按地址(local, remote)投递message, 可以方便实现scaling out
actor不知道是否是和remote actor通信,local, remote区别在于配置,所以Akka可以透明的从scaling up转向scaling out(仅通过修改几行代码) 所以可以像积木一样随意组装
/--
在单JVM中上锁访问的可变状态(hhtp-session) 在scaling out时,直接写入一个数据库是最简单的方式(同库集成)
但是这意味着修改大量代码;
Akka用不可变消息解决了以上问题
Akka actor 在dispatcher(底层调度执行pool)之上运行,dispatcher和配置直接映射(不同dispatcher类型可被选择),所以在scaling up时,只修改配置,不需要动代码,其次actor和线程不是一一映射导致其非常轻量(比线程占用空间更少)
/---
RPC 缺点 点对点通信不适合大规模集群(网络拓扑结构会很复杂,并伴随负载)
面向消息的中间件可以解决这个问题,but代价是应用层混入消息系统(消息中间件必须跟着应用演变)
Akka 使用分布式编程模型(Actor(抽象了本地和分布式环境))在scaling out时,顶层看起来是一样的(透明)
分布式术语:
节点:通过网络通信的应用
节点角色:不同的节点执行不同的任务
通信协议:消息被编码和解码为特定通信协议的报文用于节点的通信
序列化和反序列化:消息的编码和解码
membership:同一个分布式系统的节点(可动态可静态)
dynamic membership:节点数动态变化
Q:分布式环境难在哪里?
A:时延,内存访问,局部失败和并发性
//Akka remote example
object TestRemote extends App {
//////////////backend
val confBackend =
"""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
}
"""
val configBackend = ConfigFactory parseString confBackend
val backend = ActorSystem("backend", configBackend)//backend listens on address akka.tcp://backend@127.0.0.1:2551
backend.actorOf(Props[ConsoleActor], "console")
//////////////frontend
val confFrontend =
"""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
}
"""
val configFrontend = ConfigFactory parseString confFrontend
val frontend = ActorSystem("frontend", configFrontend)//frontend listens on address akka.tcp://backend@127.0.0.1:2552
val path = "akka.tcp://backend@127.0.0.1:2551/user/console"
val console = frontend.actorSelection(path)
console ! "Hello World"
}
//
class ConsoleActor extends Actor {
def receive = {
case m => println(s"received $m")
}
}
actorSelection
actorSelection方法会在本地创建一个远程actor的代理,代理来处理和监听远端actor的重启消息接收等,可以有容错作用,通过actorSelection直接获取actorRef 也可直接通信,但是远端actor崩溃重启,actorRef不会自动定位(容错不好,性能不错)
Akka 还可以远程部署actor
可通过配置和代码两种方式,远端actor崩溃重启,actorRef不会自动定位
远程部署actor容错:需要自己watch actor, 并且在远端actor重启时,重新deploy and watch
采用非远程部署通过actorSelection获取远程actorRef无法实现watch
通过actorSelection获得远程actorRef方法:val console = context.system.actorSelection(path)
val actorRef = Await.result(console.resolveOne(), 5.seconds)
函数式并发
Actor基于消息,他们是长久存在的对象,当某事(消息)发生时作出行为
Future用函数替代对象,是某函数异步执行的结果的占位符
import scala.concurrent._
import ExecutionContext.Implicits.global //提供执行线程池 隐转
val res = future {
1
} map { r =>
"res=" + r
} foreach { r =>
println(r)
}
//Future[Int] ==> Future[String]
//Future[T]和Option[T]类似
Future 错误处理
future {...}代码块内报异常 后续操作将直接不进行
要想获取错误值用onComplete方法
val fres = future {
throw new Exception("error")
1
} map { r =>
"res=" + r
} onComplete {
case Success(r) => println(r)
case Failure(NonFatal(e)) => println(e)
}//和ajax类似
while(true){} //需要堵塞
Future 恢复(容错)
通过recover方法 可以定义在发生特定错误时的处理逻辑
val f = future {
throw new IllegalArgumentException("error")
1
} map { r =>
"res=" + r
} recover {
case e:IllegalArgumentException => 2 //返回某默认值
} onComplete {
case Success(r) => println(r)
case Failure(NonFatal(e)) => e.printStackTrace()
}
while(true){}
////////
val f1 = doAsyc1()
val f2 = f1.flatMap { r =>
doAsyc2(r).recover {
case e:XXXException => r
}
}
//onSuccess & onFailure
val f: Future[Int] = future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
f onSuccess {
case idx => println("The keyword first appears at position: " + idx)
}
f onFailure {
case t => println("Could not process file: " + t.getMessage)
}
Future 组合
//1. firstCompletedOf
val f1 = future{1}
val f2 = future{"res="}
val f3 = Seq(f1, f2)
val f4 = Future.firstCompletedOf(f3) //取先执行完的结果
f4.onComplete {
case Success(r) => println(r)
case Failure(NonFatal(e)) => e.printStackTrace()
}
//2. zip
//Future[Int] zip Future[String] ==> Future[(Int, String)]
val f1 = future{1}
val f2 = future{"res="}
f1 zip f2 foreach(println) //等待一起执行完 并组合结果
===> (1,res=)
val f3 = f1 zip f2 map { case(i,s) =>
s+i
}
//3. sequence traverse
//Seq[Future[Int]] ==> Future[Seq[Int]] 多值映射为单值
val f = Seq(1,2,3) map { i =>
future{i}
}
Future.sequence(f) foreach println
/////////
val f = Future.traverse(Seq(1,2,3)) { i =>
future{i}
}
f foreach println
//4. fold
val f = Seq(f1, f2)
Future.fold(f)(0) { (sum:Int, i:Int) =>
...
}
//5.andThen
val allposts = mutable.Set[String]()
future {
session.getRecentPosts
} andThen {
posts => allposts ++= posts
} andThen {
posts =>
clearAll()
for (post <- allposts) render(post)
}
/**
* 组合器andThen的用法是出于纯粹的side-effecting目的。
* 经andThen返回的新Future无论原Future成功或失败都会返回与原Future一模一样
* 的结果。一旦原Future完成并返回结果,andThen后跟的代码块就会被调用,
* 且新Future将返回与原Future一样的结果,这确保了多个andThen调用的顺序执行
*/
//6. for
val f1 = future { connection.getCurrentValue(USD) }
val f2 = future { connection.getCurrentValue(CHF) }
val f3 = for {
usd <- f1
chf <- f2
if isProfitable(usd, chf)
} yield connection.buy(amount, chf)
f3 onSuccess {
case _ => println("Purchased " + amount + " CHF")
}
//f3只有当f1和f2都完成计算以后才能完成--
//它以其他两个Future的计算值为前提所以它自己的计算不能更早的开始。
与Akka的结合 主要通过 akka.pattern包来实现
1.ask
actor.ask / ?
import akka.pattern.ask
implicit val timeout = Timeout(5 seconds)
val f = (actor ? Message).mapTo[Int]
f foreach println
2.pipe
用于actor 处理Future的结果
val f = (actor1 ? Message).pipeTo(actor2)
//把上一个结果传递给下一个actor; actor2 接收的消息是值 不是Future引用
Await, Promise with Future
val f = future {
1
} map { r =>
"res=" + r
} recover {
case e:Exception => 2
}
val i = Await.result(f, 5.seconds) //同步等待 直到futrue执行结束 返回值
//promise是一个可写的,可以实现一个future的单一赋值容器
val p = promise[T]
val f = p.future
val prod = future { //生产者
val r = calcResult()
p success r //此时f被写入值
//do other things 同时 下面第19行代码的回调可以执行了
}
val cons = future {
//do something...
f onSuccess { //获取写入值
case r => doSomethingWithRes(r)
}
}
future help doc url: http://docs.scala-lang.org/overviews/core/futures.html
or zh: https://code.csdn.net/DOC_Scala/chinese_scala_offical_document/file/Futures-and-Promises-cn.md
--加载
----默认加载applaction.conf, 代码不写则加载akka-actor包下的reference.conf
----加载配置:val sys = ActorSystem("sys",ConfigFactory.load("sys"))
----获取配置:sys.setting.config.getString("myApp.name")
"""sys.conf"""
myApp {
name = "mySystem"
}
--为不同子系统定义配置
----1.提取共享配置:baseConfig.conf
----2.子系统include "baseConfig" 并定义自己的部分 或覆盖公共部分--akka应用日志
----1.在Actor中用 val log = Logging(context.system, this)
----或者:with ActorLogging
----2.配置
akka {
event-handlers = ["akka.event.Logging$DefaultLogger"] #仅仅输出到STDOUT
loglevel = "DEBUG"
}
----自定义日志handler
class MyEventListener extends Actor {
def receive = {
case InistializeLogger(_) => //系统消息
sender ! LoggerInitialized
case Error(cause, logSource, logClass, message) =>
println("ERROR " + message)
case Warning(logSource, logClass, message) =>
println("WARN " + message)
case Info(logSource, logClass, message) =>
println("INFO " + message)
case Debug(logSource, logClass, message) =>
println("DEBUG " + message)
}
}
使用slf4j的eventHandler
akka {
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
loglevel = "DEBUG"
}
--配置akka日志
akka {
loglevel = DEBUG #*必须设置成DEBUG 下面的debug才可以用
log-config-on-start = on #启动时显示用了哪个配置文件
debug {
receive = on #记录actor 接收的消息(user-level级)由akka.event.LoggingReceive处理
autoreceive = on #记录所有自动接收的消息(Kill, PoisonPill)
lifecycle = on #记录actor lifecycle changes
fsm = on #状态机相关
event-stream = on #记录eventSteam (subscribe/unsubscribe)
}
remote {
log-sent-messages = on #记录出站的消息
log-received-messages = on #记录进站的消息
}
}
class MyActor extends Actor with ActorLogging {
def receive = LoggingReceive { //记录接收消息
cae ... => ...
}
}
1.管道和过滤器
--管道Pips:一个进程或线程将处理结果传给下一个进程做额外处理的能力
(Unix |符)
--Pipeline:多个管道组合,大都为串行,akka提供并行
--过滤器:传递给下一个proccess之前的验证逻辑
--Pip&Filter模式:input -pip-> check -pip-> check -> output
每个Filter包含3部分:进站pipe,处理器,出站pipe
进站消息和出站消息必须是一样的(交给下一个处理),所以过滤器的顺序是可以对调的,
--Akka的实现
class Filter1(pipe: ActorRef) extend Actor {
def receive = {
case msg:Message =>
if...cond1
pipe ! msg
}
}
class Filter2(pipe: ActorRef) extend Actor {
def receive = {
case msg:Message =>
if...cond2
pipe ! msg
}
}
2.发散聚合模式
并行的处理问题分 分发器和聚合器两部分 map reduce
Akka 实现 Scatter & Gather 模式
class Filter1(pipe: ActorRef) extend Actor {
def receive = {
case msg:Message =>
val res1 = proccess...
pipe ! res1
}
}
class Filter2(pipe: ActorRef) extend Actor {
def receive = {
case msg:Message =>
val res2 = proccess...
pipe ! res2
}
}
//Scatter
class RecipientList(recipientList: Seq[ActorRef]) extend Actor {
def receive = { //分发给不同的收件人(pipe)
case msg:AnyRef => recipientList.foreach(_ ! msg)
}
}
//Gather 会缓存消息,当消息都接收完了之后再处理,并发给下一个process
class Aggregator(timeout:Duration, pipe:ActorRef) extends Actor {
val msgs = new ListBuffer[Mssage] //缓存消息
def receive = {
case msg:Message => {
msgs.find(_.id == msg.id) match { //当有两条一样时发送
case Some(existMsg) => {
pipe ! existMsg
msgs -= existMsg
}
case None => msgs += msg
}
}
}
}
1.点对点 Actor默认投递方式
单链式:sender -3-2-1-> p2pchannel -3-2-1-> receiver保证消息顺序不会乱
多接收者:sender -3-2-1-> p2pchannel 1->receiver1 | 2->receiver2 | 3->receiver3 每个接收者只处理一种消息
2.发布订阅
概述:发给多个接收者,并且不知道接收者是谁,channel负责跟踪接收者,多个接收者处理同个消息,接收者的数量可动态变动(unSubscribe/Subscribe) 消息有序
场景:发货和库存更新 需要同一份订单消息,并且可并行执行
EventStream: 支持发布订阅channel, 支持接收者动态变动, 可看成多个Publish-Subscribe channels的管理者
Actor接收任何来自EventStream的消息,不需要额外编码,只用receive方法就行
注册和注销自己:
system.eventStream.subscribe(slef, classOf[Message])
system.eventStream.unsubscribe(slef, classOf[Message])
发布消息
system.eventStream.publish(msg)
可订阅多种消息,注销时需要一一注销
2.5. EventBus 一个概念而已
3.自定义EventBus
需要实现三种实体:
Event 消息
Subscriber (提供被注册的能力,EventStream里面就是ActorRef)
Classifier(选择订阅者分发事件,EventStream里面Classifier 就是消息的类型)
Akka有三种可组合的traits 可用来追踪订阅者
LookupClassification:基本分类器,维护了订阅者集合,实现classify方法来从事件中提取classifier
SubchannelClassification: 它希望监听者不只是叶子节点,实现有层级的结构
ScanningClassification:最复杂的一个
他们实现了unSubscribe/Subscribe方法,但是有别的方法需要实现
classify(event:Event):Classifier //从事件中提取分类器
compareSubscribers(a:Subscriber, b:Subscriber):Int //订阅者的比较器
publish(event:Event, subscriber:Subscriber) //
mapSize:Int //返回不同classifier的数目
class MessageBus extends EventBus with LookupClassification {
type Event = Message
type Classifier = Boolean //自定义 这里以消息的number字段是不是>1
def mapSize = 2 //true,false 两种
//按类型来,就不需要自定义了
protected def classify(event:MessageBus#Event) = {
event.number > 1
}
}
//test
val bus = new MessageBus
bus.subscribe(actorRef1, false) //监听 < =1
bus.subscribe(actorRef2, true) //监听 > 1
bus.publish(Message(1))
bus.publish(Message(2))
- DeadLetter(死信) channel
概念:只有失败的消息才会进入,监听这里可以帮助发现系统的问题,不可以通过它发送消息
死信:所有不能被处理或者投递(达到)的消息
Akka用EventStream去实现死信队列
system.eventStream.subscribe(monitorRef, classOf[DeadKetter])
actor1 ! PoisonPill //自杀
actor1 ! Message //接收不到
val dead = monitorRef.expectMsgType[DeadLetter]
dead.message
dead.sender //发送人
dead.recipient //收信人
//当Actor不知道怎么处理消息时可以显示发送给deadLetter
system.deadLetters ! msg
5.Cuaranteed deliver channel (属于点对点channel)
概念:保证所有消息发送和投递 分几个等级,但是akka不保证消息100%投递成功
构建系统时,我们需要考虑怎样的程度的担保是足够的
一般规则:消息至多投递一次
Akka保证消息投递一次或者投递失败(不是很好) 但是有两种解决方式:
a. 发送本地消息不太会失败(单JVM发送消息只是方法调用,失败就是报异常了,此时会有别的策略去处理(容错),消息确实不需要到达了),所以单JVM消息投递是可靠的
b. remote actors, 非常可能丢失消息,尤其是在不可靠网络之上,ReliableProxy被用来解决这个问题,它使得发送消息如发送local message一样可靠。
唯一的顾虑是发送和接收者所在JVM
Q: How does the ReliableProxy work?
A: 当开启ReliableProxy(Actor) 在不同的节点间创建了一个隧道
{client-node: sender -> ReliableProxy} ---> {server-node: egress -> service }
Egress是一个被ReliableProxy启动的Actor 两个Actor都实现了check resend 功能去跟踪哪个消息被远程接收者接收;
当消息投递失败时,ReliableProxy会重试,直到Egress接收到,Egress转发给真正的接收者;
当目标actor终止时,本地ReliableProxy将会终止
ReliableProxy限制:只能正对一个接收者,并且单向(若要回发,会再启两个代理)
val remoteActorRef = system.actorFor(path)
val proxy = system.actorOf(Props(new ReliableProxy(remoteActorRef,500.millis)), "proxy")
proxy ! Message
//akka 2.3.8之后不可用system.actorFor方法,只能用actorSelection方法,而Selection底层就是用的Proxy
val s = Ref(Set(1,2,3))
val reservedS = atomic { implicit tx = {
val head = s().head
s() = s().tail
head
}}
actors
address --> ActorRef 替换直接引用(内存引用)
mailboxes
ActorCell.newActor 会委托props.newActor创建,并将behaviorStack(为毛是栈?为了实现become和unbecome)head 设置为instance.receive。
actorRef ! Message ; 调用ActorCell.tell --> dispatcher.dispath(分发)-->将消息放入ActorCell的队列mbox.enqueue里,并executor.execute(mbox) -->执行mbox的run方法(mailbox被分发到某线程上)-->先处理系统消息(mailBox.processAllSystemMessages),再处理用户消息(mailBox.processMailbox) --> dequeue队列获得消息--> actor invoke next --> ActorCell.recevieMessage(msg) --> Actor.aroundReceive(behaviorStack.head, msg)执行receive方法
...
目标:全自动的管理actor集群, 负载均衡,故障处理
目前的features:
--membership:容错的membership
--负载均衡:根据路由算法 路由actors的message
--节点分区:节点可以有自己的角色,使路由器可以配置为只给某角色的节点发消息
--Partition points(分区点):一个ActorSystem 可以有一部分子树在别的节点上(目前只有top level actors可以这样)
不提供: 状态冗余,重分区,重负载
最适用:单一目的的数据处理应用(图像处理,实时数据分析)
种子节点(是一种节点角色):
a.最基本的用于启动集群的节点(其实是冗余的控制节点); b.没有任何的actors,是个纯的节点;
c.是其它节点的中间联系人
d.要启动akka cluster 必须配置一系列的种子节点,并且第一个种子节点有特殊的角色,First seed Node必须最先启动;
e.其它种子节点可以和第一种子节点一起启动,它们会等待第一种子节点启动好
f.其它成功join启动集群后,第一种子节点可以安全的离开集群
注:种子节点不是必须的,你可以手动启动一个节点,让它自己加入自己编程集群,这样就是手动需要了解地址等
配置:
akka {
loglevel = INFO
stdout-loglevel = INFO
event-handlers = ["akka.event.Logging$DefaultLogger"]
actor { #修改provider
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
log-remote-lifecycle-events = off
netty.tcp {
hostname = ""
host = ${HOST} #每个节点的主机
port = ${PORT} #每个节点的监听端口
}
}
cluster {
seed-nodes = ["akka.tcp://words@127.0.0.1:2551",#First seed Node
"akka.tcp://words@127.0.0.1:2552", #actorsystem名称必须相同
"akka.tcp://words@127.0.0.1:2553"] #种子节点列表
roles = ["seed"]
}
}
此时启动三个JVM配置不同的port,就创建三个seed node了,并且自动组成cluster
//first seed node
val seedConfig = ConfigFactory.load("seed")
val seedSystem = ActorSystem("words", seedConfig)
//启动后自己join自己
//状态变化:Join -> Up
//同样的代码启动其它两个种子节点
//first seed node 离开集群
val address = Cluster(seedSystem).selfAddress
Cluster(seedSystem).leave(address)
//状态变为Leaving -> Exiting -> Unreachable
在first seed node Exiting状态时,seed node2自动变成leader(处理Join请求)
节点的状态(Joining,Up...)变化通知,会在所有节点间传递
Gossip(闲聊) 协议:
在节点之间传递节点状态的协议,每个节点描述自己的状态和它看到的别的节点的状态,最后节点的状态会收敛成一致状态,每次收敛之后都可以确定Leader
first seed node退出后,不能Cluster(seedSystem).join(selfAddress),而是应该重启
seedSystem.shutdown
val seedSystem = ActorSystem("words", seedConfig)
//一个actorsystem只能加入cluster一次,但是可以以相同的配置启动一个新actorsystem
Akka Cluster会探测Unreachable的节点,并卸载(down),leader节点在Unreachable无法正常工作,可以用down方法,卸载任意一个节点
val address = Address("akka.tcp", "words", "127.0.0.1", 2551)
Cluster(seedSystem).down(address)
节点状态转换:
-join-> [Joining] -leader action-> [Up] -leave-> [Leaving] -leader action->
[Exiting] -leader action-> [Unreachable] -down-> [Down] --> Removed
订阅Cluster Domain Events
class ClusterDomainEventListener extends Actor with ActorLogging {
Cluster(context.system).subscribe(self, classOf[ClusterDomainEvent])
def receive ={
case MemberUp(member) => log.info(s"$member UP.")
case MemberExited(member) => log.info(s"$member EXITED.")
case MemberRemoved(m, previousState) =>
if(previousState == MemberStatus.Exiting) {
log.info(s"Member $m gracefully exited, REMOVED.")
} else {
log.info(s"$m downed after unreachable, REMOVED.")
}
case UnreachableMember(m) => log.info(s"$m UNREACHABLE")
case ReachableMember(m) => log.info(s"$m REACHABLE")
case s: CurrentClusterState => log.info(s"cluster state: $s")
}
override def postStop(): Unit = {
Cluster(context.system).unsubscribe(self)
super.postStop()
}
}
为MemberUp事件配置最小数量节点
#在master上配置
role {
worker.min-nr-of-members = 2 #当worker节点数超过两个时 处罚 memberup事件
}
object Main extends App {
val config = ConfigFactory.load()
val system = ActorSystem("words", config)
println(s"Starting node with roles: ${Cluster(system).selfRoles}")
val roles = system.settings.config
.getStringList("akka.cluster.roles")
if(roles.contains("master")) { //当是master节点时 注册事件
Cluster(system).registerOnMemberUp { //worker节点2个时触发
val receptionist = system.actorOf(Props[JobReceptionist],
"receptionist")
println("Master node is ready.")
}
}
}
Routers 集群的路由
和用本地的Router是一样的,用路由去worker node上创建worker actor
trait CreateWorkerRouter { this: Actor =>
def createWorkerRouter: ActorRef = {
context.actorOf(
ClusterRouterPool(BroadcastPool(10), //用Pool来创建 广播路由
ClusterRouterPoolSettings(
totalInstances = 1000, //集群中最多多少个
maxInstancesPerNode = 20, //每个节点最多多少个
allowLocalRoutees = false, //不在本地节点创建, 只在worker节点上创建
useRole = None
)
).props(Props[JobWorker]),
name = "worker-router")
}
}
编写master
class JobMaster extends Actor
with ActorLogging
with CreateWorkerRouter {
// inside the body of the JobMaster actor..
val router = createWorkerRouter
def receive = idle
def idle: Receive = {
case StartJob(jobName, text) =>
textParts = text.grouped(10).toVector //分割文本
val cancel = system.scheduler.schedule( 0 millis, #定期给路由发
1000 millis,
router,
Work(jobName, self))
//发Work消息创建worker
become(working(jobName, sender, cancel))
}
// more code
//TODO
当有状态的actors崩溃时,做数据恢复
点对点消息保证一次投递
JDK ScheduledThreadPoolExecutor 通过DelayedWorkQueue 是一个PriorityQueue
(take 是会堵塞的)优先级为Delay时间, 每次添加task都会重排序,时间短排前面,调度时根据Delay时间判断是否执行
Akka Scheduler 则通过Netty HashedWheelTimer ,通过设置Tick Duration,扫一次Task队列,(会Thread.sleep(sleepMs))
哪个性能更好??? 不知道