@MiloXia
2015-09-09T02:12:10.000000Z
字数 16022
阅读 13692
akka
容错:不是抓住系统所有的错误并恢复,而是将错误(崩溃)孤立出来,不会导致整个系统崩溃(隔离故障组件),备份组件可以替换崩溃组件(冗余)(可恢复性)
容错方式:Restart, Resume, Stop, Escalate
let it crash原则
崩溃原因:网络,第三方服务,硬件故障
Akka容错:分离业务逻辑(receive)和容错逻辑(supervisorStrategy)
父actor自动成为子actor的supervisor
supervisor不fix子actor,而是简单的呈现如何恢复的一个判断==>
List(Restart, //重启并替换原actor,mailbox消息可继续发送,
//但是接收会暂停至替换完成,重启默认重启所有子actor
Resume, //同一个actor不重启,忽略崩溃,继续处理下一个消息
Stop, //terminated 不再处理任何消息,剩余消息会进入死信信箱
Escalate//交给上层处理
)
Akka提供两种恢复策略:
OneForOneStrategy: 只针对单独actor
override def supervisorStrategy = OneForOneStrategy() {case _:XXXException => Restart}
AllForOneStrategy: 对所有节点,一个节点重启,其它节点全部重启
不处理TheadDeath OOM等JVM Error,一直往上抛直到actorSystem顶层actor user guardian 会shutdown ,可通过配置使其优雅shutdown
scaling up 垂直扩展:某节点可以在单机(单JVM)上运行更多的数量
scaling out 水平扩展:某节点可以在多台机器上运行
Akka 按地址(local, remote)投递message, 可以方便实现scaling out
actor不知道是否是和remote actor通信,local, remote区别在于配置,所以Akka可以透明的从scaling up转向scaling out(仅通过修改几行代码) 所以可以像积木一样随意组装
/--
在单JVM中上锁访问的可变状态(hhtp-session) 在scaling out时,直接写入一个数据库是最简单的方式(同库集成)
但是这意味着修改大量代码;
Akka用不可变消息解决了以上问题
Akka actor 在dispatcher(底层调度执行pool)之上运行,dispatcher和配置直接映射(不同dispatcher类型可被选择),所以在scaling up时,只修改配置,不需要动代码,其次actor和线程不是一一映射导致其非常轻量(比线程占用空间更少)
/---
RPC 缺点 点对点通信不适合大规模集群(网络拓扑结构会很复杂,并伴随负载)
面向消息的中间件可以解决这个问题,but代价是应用层混入消息系统(消息中间件必须跟着应用演变)
Akka 使用分布式编程模型(Actor(抽象了本地和分布式环境))在scaling out时,顶层看起来是一样的(透明)
分布式术语:
节点:通过网络通信的应用
节点角色:不同的节点执行不同的任务
通信协议:消息被编码和解码为特定通信协议的报文用于节点的通信
序列化和反序列化:消息的编码和解码
membership:同一个分布式系统的节点(可动态可静态)
dynamic membership:节点数动态变化
Q:分布式环境难在哪里?
A:时延,内存访问,局部失败和并发性
//Akka remote exampleobject TestRemote extends App {//////////////backendval confBackend ="""akka {actor {provider = "akka.remote.RemoteActorRefProvider"}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 2551}}}"""val configBackend = ConfigFactory parseString confBackendval backend = ActorSystem("backend", configBackend)//backend listens on address akka.tcp://backend@127.0.0.1:2551backend.actorOf(Props[ConsoleActor], "console")//////////////frontendval confFrontend ="""akka {actor {provider = "akka.remote.RemoteActorRefProvider"}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 2552}}}"""val configFrontend = ConfigFactory parseString confFrontendval frontend = ActorSystem("frontend", configFrontend)//frontend listens on address akka.tcp://backend@127.0.0.1:2552val path = "akka.tcp://backend@127.0.0.1:2551/user/console"val console = frontend.actorSelection(path)console ! "Hello World"}//class ConsoleActor extends Actor {def receive = {case m => println(s"received $m")}}
actorSelection
actorSelection方法会在本地创建一个远程actor的代理,代理来处理和监听远端actor的重启消息接收等,可以有容错作用,通过actorSelection直接获取actorRef 也可直接通信,但是远端actor崩溃重启,actorRef不会自动定位(容错不好,性能不错)
Akka 还可以远程部署actor
可通过配置和代码两种方式,远端actor崩溃重启,actorRef不会自动定位
远程部署actor容错:需要自己watch actor, 并且在远端actor重启时,重新deploy and watch
采用非远程部署通过actorSelection获取远程actorRef无法实现watch
通过actorSelection获得远程actorRef方法:val console = context.system.actorSelection(path)
val actorRef = Await.result(console.resolveOne(), 5.seconds)
函数式并发
Actor基于消息,他们是长久存在的对象,当某事(消息)发生时作出行为
Future用函数替代对象,是某函数异步执行的结果的占位符
import scala.concurrent._import ExecutionContext.Implicits.global //提供执行线程池 隐转val res = future {1} map { r =>"res=" + r} foreach { r =>println(r)}//Future[Int] ==> Future[String]//Future[T]和Option[T]类似
Future 错误处理
future {...}代码块内报异常 后续操作将直接不进行
要想获取错误值用onComplete方法
val fres = future {throw new Exception("error")1} map { r =>"res=" + r} onComplete {case Success(r) => println(r)case Failure(NonFatal(e)) => println(e)}//和ajax类似while(true){} //需要堵塞
Future 恢复(容错)
通过recover方法 可以定义在发生特定错误时的处理逻辑
val f = future {throw new IllegalArgumentException("error")1} map { r =>"res=" + r} recover {case e:IllegalArgumentException => 2 //返回某默认值} onComplete {case Success(r) => println(r)case Failure(NonFatal(e)) => e.printStackTrace()}while(true){}////////val f1 = doAsyc1()val f2 = f1.flatMap { r =>doAsyc2(r).recover {case e:XXXException => r}}//onSuccess & onFailureval f: Future[Int] = future {val source = scala.io.Source.fromFile("myText.txt")source.toSeq.indexOfSlice("myKeyword")}f onSuccess {case idx => println("The keyword first appears at position: " + idx)}f onFailure {case t => println("Could not process file: " + t.getMessage)}
Future 组合
//1. firstCompletedOfval f1 = future{1}val f2 = future{"res="}val f3 = Seq(f1, f2)val f4 = Future.firstCompletedOf(f3) //取先执行完的结果f4.onComplete {case Success(r) => println(r)case Failure(NonFatal(e)) => e.printStackTrace()}//2. zip//Future[Int] zip Future[String] ==> Future[(Int, String)]val f1 = future{1}val f2 = future{"res="}f1 zip f2 foreach(println) //等待一起执行完 并组合结果===> (1,res=)val f3 = f1 zip f2 map { case(i,s) =>s+i}//3. sequence traverse//Seq[Future[Int]] ==> Future[Seq[Int]] 多值映射为单值val f = Seq(1,2,3) map { i =>future{i}}Future.sequence(f) foreach println/////////val f = Future.traverse(Seq(1,2,3)) { i =>future{i}}f foreach println//4. foldval f = Seq(f1, f2)Future.fold(f)(0) { (sum:Int, i:Int) =>...}//5.andThenval allposts = mutable.Set[String]()future {session.getRecentPosts} andThen {posts => allposts ++= posts} andThen {posts =>clearAll()for (post <- allposts) render(post)}/*** 组合器andThen的用法是出于纯粹的side-effecting目的。* 经andThen返回的新Future无论原Future成功或失败都会返回与原Future一模一样* 的结果。一旦原Future完成并返回结果,andThen后跟的代码块就会被调用,* 且新Future将返回与原Future一样的结果,这确保了多个andThen调用的顺序执行*///6. forval f1 = future { connection.getCurrentValue(USD) }val f2 = future { connection.getCurrentValue(CHF) }val f3 = for {usd <- f1chf <- f2if isProfitable(usd, chf)} yield connection.buy(amount, chf)f3 onSuccess {case _ => println("Purchased " + amount + " CHF")}//f3只有当f1和f2都完成计算以后才能完成--//它以其他两个Future的计算值为前提所以它自己的计算不能更早的开始。
与Akka的结合 主要通过 akka.pattern包来实现
1.ask
actor.ask / ?
import akka.pattern.askimplicit val timeout = Timeout(5 seconds)val f = (actor ? Message).mapTo[Int]f foreach println
2.pipe
用于actor 处理Future的结果
val f = (actor1 ? Message).pipeTo(actor2)//把上一个结果传递给下一个actor; actor2 接收的消息是值 不是Future引用
Await, Promise with Future
val f = future {1} map { r =>"res=" + r} recover {case e:Exception => 2}val i = Await.result(f, 5.seconds) //同步等待 直到futrue执行结束 返回值//promise是一个可写的,可以实现一个future的单一赋值容器val p = promise[T]val f = p.futureval prod = future { //生产者val r = calcResult()p success r //此时f被写入值//do other things 同时 下面第19行代码的回调可以执行了}val cons = future {//do something...f onSuccess { //获取写入值case r => doSomethingWithRes(r)}}
future help doc url: http://docs.scala-lang.org/overviews/core/futures.html
or zh: https://code.csdn.net/DOC_Scala/chinese_scala_offical_document/file/Futures-and-Promises-cn.md
--加载
----默认加载applaction.conf, 代码不写则加载akka-actor包下的reference.conf
----加载配置:val sys = ActorSystem("sys",ConfigFactory.load("sys"))
----获取配置:sys.setting.config.getString("myApp.name")
"""sys.conf"""myApp {name = "mySystem"}
--为不同子系统定义配置
----1.提取共享配置:baseConfig.conf
----2.子系统include "baseConfig" 并定义自己的部分 或覆盖公共部分--akka应用日志
----1.在Actor中用 val log = Logging(context.system, this)
----或者:with ActorLogging
----2.配置
akka {event-handlers = ["akka.event.Logging$DefaultLogger"] #仅仅输出到STDOUTloglevel = "DEBUG"}
----自定义日志handler
class MyEventListener extends Actor {def receive = {case InistializeLogger(_) => //系统消息sender ! LoggerInitializedcase Error(cause, logSource, logClass, message) =>println("ERROR " + message)case Warning(logSource, logClass, message) =>println("WARN " + message)case Info(logSource, logClass, message) =>println("INFO " + message)case Debug(logSource, logClass, message) =>println("DEBUG " + message)}}
使用slf4j的eventHandler
akka {event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]loglevel = "DEBUG"}
--配置akka日志
akka {loglevel = DEBUG #*必须设置成DEBUG 下面的debug才可以用log-config-on-start = on #启动时显示用了哪个配置文件debug {receive = on #记录actor 接收的消息(user-level级)由akka.event.LoggingReceive处理autoreceive = on #记录所有自动接收的消息(Kill, PoisonPill)lifecycle = on #记录actor lifecycle changesfsm = on #状态机相关event-stream = on #记录eventSteam (subscribe/unsubscribe)}remote {log-sent-messages = on #记录出站的消息log-received-messages = on #记录进站的消息}}
class MyActor extends Actor with ActorLogging {def receive = LoggingReceive { //记录接收消息cae ... => ...}}
1.管道和过滤器
--管道Pips:一个进程或线程将处理结果传给下一个进程做额外处理的能力
(Unix |符)
--Pipeline:多个管道组合,大都为串行,akka提供并行
--过滤器:传递给下一个proccess之前的验证逻辑
--Pip&Filter模式:input -pip-> check -pip-> check -> output
每个Filter包含3部分:进站pipe,处理器,出站pipe
进站消息和出站消息必须是一样的(交给下一个处理),所以过滤器的顺序是可以对调的,
--Akka的实现
class Filter1(pipe: ActorRef) extend Actor {def receive = {case msg:Message =>if...cond1pipe ! msg}}class Filter2(pipe: ActorRef) extend Actor {def receive = {case msg:Message =>if...cond2pipe ! msg}}
2.发散聚合模式
并行的处理问题分 分发器和聚合器两部分 map reduce
Akka 实现 Scatter & Gather 模式
class Filter1(pipe: ActorRef) extend Actor {def receive = {case msg:Message =>val res1 = proccess...pipe ! res1}}class Filter2(pipe: ActorRef) extend Actor {def receive = {case msg:Message =>val res2 = proccess...pipe ! res2}}//Scatterclass RecipientList(recipientList: Seq[ActorRef]) extend Actor {def receive = { //分发给不同的收件人(pipe)case msg:AnyRef => recipientList.foreach(_ ! msg)}}//Gather 会缓存消息,当消息都接收完了之后再处理,并发给下一个processclass Aggregator(timeout:Duration, pipe:ActorRef) extends Actor {val msgs = new ListBuffer[Mssage] //缓存消息def receive = {case msg:Message => {msgs.find(_.id == msg.id) match { //当有两条一样时发送case Some(existMsg) => {pipe ! existMsgmsgs -= existMsg}case None => msgs += msg}}}}
1.点对点 Actor默认投递方式
单链式:sender -3-2-1-> p2pchannel -3-2-1-> receiver保证消息顺序不会乱
多接收者:sender -3-2-1-> p2pchannel 1->receiver1 | 2->receiver2 | 3->receiver3 每个接收者只处理一种消息
2.发布订阅
概述:发给多个接收者,并且不知道接收者是谁,channel负责跟踪接收者,多个接收者处理同个消息,接收者的数量可动态变动(unSubscribe/Subscribe) 消息有序
场景:发货和库存更新 需要同一份订单消息,并且可并行执行
EventStream: 支持发布订阅channel, 支持接收者动态变动, 可看成多个Publish-Subscribe channels的管理者
Actor接收任何来自EventStream的消息,不需要额外编码,只用receive方法就行
注册和注销自己:
system.eventStream.subscribe(slef, classOf[Message])system.eventStream.unsubscribe(slef, classOf[Message])
发布消息
system.eventStream.publish(msg)
可订阅多种消息,注销时需要一一注销
2.5. EventBus 一个概念而已
3.自定义EventBus
需要实现三种实体:
Event 消息
Subscriber (提供被注册的能力,EventStream里面就是ActorRef)
Classifier(选择订阅者分发事件,EventStream里面Classifier 就是消息的类型)
Akka有三种可组合的traits 可用来追踪订阅者
LookupClassification:基本分类器,维护了订阅者集合,实现classify方法来从事件中提取classifier
SubchannelClassification: 它希望监听者不只是叶子节点,实现有层级的结构
ScanningClassification:最复杂的一个
他们实现了unSubscribe/Subscribe方法,但是有别的方法需要实现
classify(event:Event):Classifier //从事件中提取分类器
compareSubscribers(a:Subscriber, b:Subscriber):Int //订阅者的比较器
publish(event:Event, subscriber:Subscriber) //
mapSize:Int //返回不同classifier的数目
class MessageBus extends EventBus with LookupClassification {type Event = Messagetype Classifier = Boolean //自定义 这里以消息的number字段是不是>1def mapSize = 2 //true,false 两种//按类型来,就不需要自定义了protected def classify(event:MessageBus#Event) = {event.number > 1}}//testval bus = new MessageBusbus.subscribe(actorRef1, false) //监听 < =1bus.subscribe(actorRef2, true) //监听 > 1bus.publish(Message(1))bus.publish(Message(2))
- DeadLetter(死信) channel
概念:只有失败的消息才会进入,监听这里可以帮助发现系统的问题,不可以通过它发送消息
死信:所有不能被处理或者投递(达到)的消息
Akka用EventStream去实现死信队列
system.eventStream.subscribe(monitorRef, classOf[DeadKetter])actor1 ! PoisonPill //自杀actor1 ! Message //接收不到val dead = monitorRef.expectMsgType[DeadLetter]dead.messagedead.sender //发送人dead.recipient //收信人//当Actor不知道怎么处理消息时可以显示发送给deadLettersystem.deadLetters ! msg
5.Cuaranteed deliver channel (属于点对点channel)
概念:保证所有消息发送和投递 分几个等级,但是akka不保证消息100%投递成功
构建系统时,我们需要考虑怎样的程度的担保是足够的
一般规则:消息至多投递一次
Akka保证消息投递一次或者投递失败(不是很好) 但是有两种解决方式:
a. 发送本地消息不太会失败(单JVM发送消息只是方法调用,失败就是报异常了,此时会有别的策略去处理(容错),消息确实不需要到达了),所以单JVM消息投递是可靠的
b. remote actors, 非常可能丢失消息,尤其是在不可靠网络之上,ReliableProxy被用来解决这个问题,它使得发送消息如发送local message一样可靠。
唯一的顾虑是发送和接收者所在JVM
Q: How does the ReliableProxy work?
A: 当开启ReliableProxy(Actor) 在不同的节点间创建了一个隧道
{client-node: sender -> ReliableProxy} ---> {server-node: egress -> service }
Egress是一个被ReliableProxy启动的Actor 两个Actor都实现了check resend 功能去跟踪哪个消息被远程接收者接收;
当消息投递失败时,ReliableProxy会重试,直到Egress接收到,Egress转发给真正的接收者;
当目标actor终止时,本地ReliableProxy将会终止
ReliableProxy限制:只能正对一个接收者,并且单向(若要回发,会再启两个代理)
val remoteActorRef = system.actorFor(path)val proxy = system.actorOf(Props(new ReliableProxy(remoteActorRef,500.millis)), "proxy")proxy ! Message//akka 2.3.8之后不可用system.actorFor方法,只能用actorSelection方法,而Selection底层就是用的Proxy
val s = Ref(Set(1,2,3))val reservedS = atomic { implicit tx = {val head = s().heads() = s().tailhead}}
actors
address --> ActorRef 替换直接引用(内存引用)
mailboxes
ActorCell.newActor 会委托props.newActor创建,并将behaviorStack(为毛是栈?为了实现become和unbecome)head 设置为instance.receive。
actorRef ! Message ; 调用ActorCell.tell --> dispatcher.dispath(分发)-->将消息放入ActorCell的队列mbox.enqueue里,并executor.execute(mbox) -->执行mbox的run方法(mailbox被分发到某线程上)-->先处理系统消息(mailBox.processAllSystemMessages),再处理用户消息(mailBox.processMailbox) --> dequeue队列获得消息--> actor invoke next --> ActorCell.recevieMessage(msg) --> Actor.aroundReceive(behaviorStack.head, msg)执行receive方法
...
目标:全自动的管理actor集群, 负载均衡,故障处理
目前的features:
--membership:容错的membership
--负载均衡:根据路由算法 路由actors的message
--节点分区:节点可以有自己的角色,使路由器可以配置为只给某角色的节点发消息
--Partition points(分区点):一个ActorSystem 可以有一部分子树在别的节点上(目前只有top level actors可以这样)
不提供: 状态冗余,重分区,重负载
最适用:单一目的的数据处理应用(图像处理,实时数据分析)
种子节点(是一种节点角色):
a.最基本的用于启动集群的节点(其实是冗余的控制节点); b.没有任何的actors,是个纯的节点;
c.是其它节点的中间联系人
d.要启动akka cluster 必须配置一系列的种子节点,并且第一个种子节点有特殊的角色,First seed Node必须最先启动;
e.其它种子节点可以和第一种子节点一起启动,它们会等待第一种子节点启动好
f.其它成功join启动集群后,第一种子节点可以安全的离开集群
注:种子节点不是必须的,你可以手动启动一个节点,让它自己加入自己编程集群,这样就是手动需要了解地址等
配置:
akka {loglevel = INFOstdout-loglevel = INFOevent-handlers = ["akka.event.Logging$DefaultLogger"]actor { #修改providerprovider = "akka.cluster.ClusterActorRefProvider"}remote {enabled-transports = ["akka.remote.netty.tcp"]log-remote-lifecycle-events = offnetty.tcp {hostname = ""host = ${HOST} #每个节点的主机port = ${PORT} #每个节点的监听端口}}cluster {seed-nodes = ["akka.tcp://words@127.0.0.1:2551",#First seed Node"akka.tcp://words@127.0.0.1:2552", #actorsystem名称必须相同"akka.tcp://words@127.0.0.1:2553"] #种子节点列表roles = ["seed"]}}
此时启动三个JVM配置不同的port,就创建三个seed node了,并且自动组成cluster
//first seed nodeval seedConfig = ConfigFactory.load("seed")val seedSystem = ActorSystem("words", seedConfig)//启动后自己join自己//状态变化:Join -> Up//同样的代码启动其它两个种子节点//first seed node 离开集群val address = Cluster(seedSystem).selfAddressCluster(seedSystem).leave(address)//状态变为Leaving -> Exiting -> Unreachable
在first seed node Exiting状态时,seed node2自动变成leader(处理Join请求)
节点的状态(Joining,Up...)变化通知,会在所有节点间传递
Gossip(闲聊) 协议:
在节点之间传递节点状态的协议,每个节点描述自己的状态和它看到的别的节点的状态,最后节点的状态会收敛成一致状态,每次收敛之后都可以确定Leader
first seed node退出后,不能Cluster(seedSystem).join(selfAddress),而是应该重启
seedSystem.shutdownval seedSystem = ActorSystem("words", seedConfig)//一个actorsystem只能加入cluster一次,但是可以以相同的配置启动一个新actorsystem
Akka Cluster会探测Unreachable的节点,并卸载(down),leader节点在Unreachable无法正常工作,可以用down方法,卸载任意一个节点
val address = Address("akka.tcp", "words", "127.0.0.1", 2551)Cluster(seedSystem).down(address)
节点状态转换:
-join-> [Joining] -leader action-> [Up] -leave-> [Leaving] -leader action->
[Exiting] -leader action-> [Unreachable] -down-> [Down] --> Removed
订阅Cluster Domain Events
class ClusterDomainEventListener extends Actor with ActorLogging {Cluster(context.system).subscribe(self, classOf[ClusterDomainEvent])def receive ={case MemberUp(member) => log.info(s"$member UP.")case MemberExited(member) => log.info(s"$member EXITED.")case MemberRemoved(m, previousState) =>if(previousState == MemberStatus.Exiting) {log.info(s"Member $m gracefully exited, REMOVED.")} else {log.info(s"$m downed after unreachable, REMOVED.")}case UnreachableMember(m) => log.info(s"$m UNREACHABLE")case ReachableMember(m) => log.info(s"$m REACHABLE")case s: CurrentClusterState => log.info(s"cluster state: $s")}override def postStop(): Unit = {Cluster(context.system).unsubscribe(self)super.postStop()}}
为MemberUp事件配置最小数量节点
#在master上配置role {worker.min-nr-of-members = 2 #当worker节点数超过两个时 处罚 memberup事件}
object Main extends App {val config = ConfigFactory.load()val system = ActorSystem("words", config)println(s"Starting node with roles: ${Cluster(system).selfRoles}")val roles = system.settings.config.getStringList("akka.cluster.roles")if(roles.contains("master")) { //当是master节点时 注册事件Cluster(system).registerOnMemberUp { //worker节点2个时触发val receptionist = system.actorOf(Props[JobReceptionist],"receptionist")println("Master node is ready.")}}}
Routers 集群的路由
和用本地的Router是一样的,用路由去worker node上创建worker actor
trait CreateWorkerRouter { this: Actor =>def createWorkerRouter: ActorRef = {context.actorOf(ClusterRouterPool(BroadcastPool(10), //用Pool来创建 广播路由ClusterRouterPoolSettings(totalInstances = 1000, //集群中最多多少个maxInstancesPerNode = 20, //每个节点最多多少个allowLocalRoutees = false, //不在本地节点创建, 只在worker节点上创建useRole = None)).props(Props[JobWorker]),name = "worker-router")}}
编写master
class JobMaster extends Actorwith ActorLoggingwith CreateWorkerRouter {// inside the body of the JobMaster actor..val router = createWorkerRouterdef receive = idledef idle: Receive = {case StartJob(jobName, text) =>textParts = text.grouped(10).toVector //分割文本val cancel = system.scheduler.schedule( 0 millis, #定期给路由发1000 millis,router,Work(jobName, self))//发Work消息创建workerbecome(working(jobName, sender, cancel))}// more code
//TODO
当有状态的actors崩溃时,做数据恢复
点对点消息保证一次投递
JDK ScheduledThreadPoolExecutor 通过DelayedWorkQueue 是一个PriorityQueue
(take 是会堵塞的)优先级为Delay时间, 每次添加task都会重排序,时间短排前面,调度时根据Delay时间判断是否执行
Akka Scheduler 则通过Netty HashedWheelTimer ,通过设置Tick Duration,扫一次Task队列,(会Thread.sleep(sleepMs))
哪个性能更好??? 不知道