[关闭]
@xtccc 2016-03-04T14:47:37.000000Z 字数 3440 阅读 2614

Actor

给我写信
GitHub

此处输入图片的描述


Scala



Java为并发编程提供的各种方法/库,都基于 shared data / lock model,用起来易错、麻烦,很容易跌入race condition和dead lock的陷阱。 Scala Actor则提供了一个库,具有鲜明的share-nothing/messafe-passing特征,更加易于使用。


目录



1. 基础


1.1 定义并运行一个Actor Object

非常简单,扩展Actor接口即可。

  1. // 需要引入库: org.scala-lang:scala-actors:2.10.4
  2. import scala.actors._
  3. object StartActor {
  4. def main(agrs: Array[String]): Unit = {
  5. println("Starting ...")
  6. SillyActor.start
  7. SeriousActor.start
  8. println("Done ...")
  9. }
  10. }
  11. object SillyActor extends Actor {
  12. override def act(): Unit = {
  13. for (i <- 0 until 5) {
  14. println(s"SillyActor: ${new Date()}")
  15. Thread.sleep(2000)
  16. }
  17. }
  18. }
  19. object SeriousActor extends Actor {
  20. override
  21. def act(): Unit = {
  22. for (i <- 0 until 5) {
  23. println(s"SeriousActor: ${new Date()}")
  24. Thread.sleep(3000)
  25. }
  26. }
  27. }

输出为:
QQ20160222-0@2x.png-105.9kB

可见,这两个actors之间是独立运行的


1.2 运行Actor的方式

除了上面调用start方法之外,还可以直接通过val来创建一个actor方法:

  1. import scala.actors.Actor._
  2. object StartActor extends App {
  3. val myActor = actor { // 定义后立即运行
  4. for (i <- 1 to 5) {
  5. println(i)
  6. Thread.sleep(1000)
  7. }
  8. }
  9. }


1.3 将普通Thread也当做Actor

如果想把当前的线程看作是一个Actor,可以使用Actor.self

  1. println("开始 ...")
  2. self ! 100
  3. self ! "你好"
  4. self.receive{case x: String => println(s"String内容为 [$x]")}
  5. self.receive{case x: Int => println(s"Int内容为 [$x]")}
  6. println("结束 ...")

输出为:
QQ20160225-1@2x.png-21.4kB




2. 消息通讯


2.1 基本的通讯方式(! and ?)

两个Actors之间怎样通讯?通过发送消息。

通过 [actor] ! [msg] 或者 [actor] ? [msg]的形式,可以向actor发送msg

! 代表tell - 向actor发送msg,然后立即返回
? 代表ask - 向actor发送msg,然后返回一个Future

例:

  1. import scala.actors.Actor._
  2. object StartActor extends App {
  3. val myActor = actor { // `myActor`运行后永不退出
  4. while (true) { // 如果没有 while ,那么只能接收一次消息
  5. receive { // actor 通过 `receive` 方法接收消息
  6. case msg => println(s"消息: [$msg]")
  7. }
  8. }
  9. }
  10. println("actor创建完毕")
  11. myActor ! "你好"
  12. myActor ! "hello"
  13. }

当Actor发送消息时,它不会被block;
当Actor接收消息时,他也不会被interrupt;且到来的消息一直在它的消息箱中,直到它调用receive


2.2 Partial Function

receive接收到的实际上是一个partial function:

An actor will only process messages matching one of the cases in the partial function passed to receive. For each message in the mailbox, receive will first invoke isDefinedAt on the passed partial function to determine whether it has a case that will match and handle the message. The receive method will choose the first message in the mailbox for which isDefinedAt returns true, and pass that message to the partial function’s apply method. The partial function’s apply method will handle the message.

例:

  1. import scala.actors.Actor._
  2. val actor1 = actor {
  3. while (true) { // 必须有while,否则只能receive一次消息
  4. receive {
  5. case x: Int => println(s"Type: Int, Value: [$x]")
  6. case x: String => println(s"Type: String, Value: [$x]")
  7. case x: Double => println(s"Type: Double, Value: [$x]")
  8. case _ => println(s"Type: Not allowed")
  9. }
  10. }
  11. }
  12. actor1 ! 100
  13. actor1 ! "你好"
  14. actor1 ! 1.23456
  15. actor1 ! true

输出为:
QQ20160225-0@2x.png-30.9kB


2.3 回应Sender

当Actor a收到Actor b的消息时,b可以向a进行回应。receive收到的数据中,隐含了sender。

例:

  1. import scala.actors.Actor
  2. /**
  3. * Created by tao on 2/25/16.
  4. */
  5. object SenderExample {
  6. def main(args: Array[String]): Unit = {
  7. val master = new Master
  8. val worker = new Worker
  9. master.start
  10. worker.start
  11. master ! ("cmd:send", "今天天气不错,挺风和日丽的", worker)
  12. }
  13. }
  14. class Master extends Actor {
  15. override def act() = {
  16. while (true) {
  17. receive {
  18. case ("cmd:send", msg, actor: Actor) =>
  19. println(s"Master -> 执行命令: 向Worker发送消息: [$msg]")
  20. actor ! msg
  21. case ("resp", msg) =>
  22. println(s"Master -> 收到Worker的回复: [$msg] ") // 收到来自receiver的回应
  23. case _ =>
  24. println(s"Master -> 无法识别")
  25. }
  26. }
  27. }
  28. }
  29. class Worker extends Actor {
  30. override def act(): Unit = {
  31. while (true) {
  32. receive {
  33. case msg: String =>
  34. println(s"Worker -> 收到了来自Master的消息 [$msg]")
  35. sender ! ("resp", "你好Master, 我是Worker") // 回应sender
  36. }
  37. }
  38. }
  39. }

运行输出为:
QQ20160225-2@2x.png-55.9kB


2.4 react

在JVM中,创建和切换threads是要付出代价的。为了减少这方面的overhead,Scala为receive方法提供了一个替代品 —— react

react 也接受一个partial function,但是它在发现并处理一条消息后,并不退出。这意味着,react的实现代码并不需要保存当前线程的call stack,并且当下一个actor醒来时,它可以重用当前的线程。

如果每一个actor都使用react而不是receive,那么只需要一个线程即可支持这些actors(如果主机有多个核,那么会使用多个线程以提高CPU利用率)。

这部分还有内容没有完成,参考书上第32.4小节




添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注