@xtccc
2016-03-04T14:47:37.000000Z
字数 3440
阅读 2576
Scala
Java为并发编程提供的各种方法/库,都基于 shared data / lock model,用起来易错、麻烦,很容易跌入race condition和dead lock的陷阱。 Scala Actor则提供了一个库,具有鲜明的share-nothing/messafe-passing特征,更加易于使用。
目录
非常简单,扩展Actor
接口即可。
// 需要引入库: org.scala-lang:scala-actors:2.10.4
import scala.actors._
object StartActor {
def main(agrs: Array[String]): Unit = {
println("Starting ...")
SillyActor.start
SeriousActor.start
println("Done ...")
}
}
object SillyActor extends Actor {
override def act(): Unit = {
for (i <- 0 until 5) {
println(s"SillyActor: ${new Date()}")
Thread.sleep(2000)
}
}
}
object SeriousActor extends Actor {
override
def act(): Unit = {
for (i <- 0 until 5) {
println(s"SeriousActor: ${new Date()}")
Thread.sleep(3000)
}
}
}
输出为:
可见,这两个actors之间是独立运行的。
除了上面调用start
方法之外,还可以直接通过val
来创建一个actor
方法:
import scala.actors.Actor._
object StartActor extends App {
val myActor = actor { // 定义后立即运行
for (i <- 1 to 5) {
println(i)
Thread.sleep(1000)
}
}
}
如果想把当前的线程看作是一个Actor,可以使用Actor.self:
println("开始 ...")
self ! 100
self ! "你好"
self.receive{case x: String => println(s"String内容为 [$x]")}
self.receive{case x: Int => println(s"Int内容为 [$x]")}
println("结束 ...")
输出为:
两个Actors之间怎样通讯?通过发送消息。
通过 [actor] ! [msg]
或者 [actor] ? [msg]
的形式,可以向actor
发送msg
。
! 代表tell - 向actor发送msg,然后立即返回
? 代表ask - 向actor发送msg,然后返回一个Future
例:
import scala.actors.Actor._
object StartActor extends App {
val myActor = actor { // `myActor`运行后永不退出
while (true) { // 如果没有 while ,那么只能接收一次消息
receive { // actor 通过 `receive` 方法接收消息
case msg => println(s"消息: [$msg]")
}
}
}
println("actor创建完毕")
myActor ! "你好"
myActor ! "hello"
}
当Actor发送消息时,它不会被block;
当Actor接收消息时,他也不会被interrupt;且到来的消息一直在它的消息箱中,直到它调用receive;
receive接收到的实际上是一个partial function:
An actor will only process messages matching one of the cases in the partial function passed to receive. For each message in the mailbox, receive will first invoke isDefinedAt on the passed partial function to determine whether it has a case that will match and handle the message. The receive method will choose the first message in the mailbox for which isDefinedAt returns true, and pass that message to the partial function’s apply method. The partial function’s apply method will handle the message.
例:
import scala.actors.Actor._
val actor1 = actor {
while (true) { // 必须有while,否则只能receive一次消息
receive {
case x: Int => println(s"Type: Int, Value: [$x]")
case x: String => println(s"Type: String, Value: [$x]")
case x: Double => println(s"Type: Double, Value: [$x]")
case _ => println(s"Type: Not allowed")
}
}
}
actor1 ! 100
actor1 ! "你好"
actor1 ! 1.23456
actor1 ! true
输出为:
当Actor a收到Actor b的消息时,b可以向a进行回应。receive
收到的数据中,隐含了sender。
例:
import scala.actors.Actor
/**
* Created by tao on 2/25/16.
*/
object SenderExample {
def main(args: Array[String]): Unit = {
val master = new Master
val worker = new Worker
master.start
worker.start
master ! ("cmd:send", "今天天气不错,挺风和日丽的", worker)
}
}
class Master extends Actor {
override def act() = {
while (true) {
receive {
case ("cmd:send", msg, actor: Actor) =>
println(s"Master -> 执行命令: 向Worker发送消息: [$msg]")
actor ! msg
case ("resp", msg) =>
println(s"Master -> 收到Worker的回复: [$msg] ") // 收到来自receiver的回应
case _ =>
println(s"Master -> 无法识别")
}
}
}
}
class Worker extends Actor {
override def act(): Unit = {
while (true) {
receive {
case msg: String =>
println(s"Worker -> 收到了来自Master的消息 [$msg]")
sender ! ("resp", "你好Master, 我是Worker") // 回应sender
}
}
}
}
运行输出为:
在JVM中,创建和切换threads是要付出代价的。为了减少这方面的overhead,Scala为receive方法提供了一个替代品 —— react。
react 也接受一个partial function,但是它在发现并处理一条消息后,并不退出。这意味着,react的实现代码并不需要保存当前线程的call stack,并且当下一个actor醒来时,它可以重用当前的线程。
如果每一个actor都使用react而不是receive,那么只需要一个线程即可支持这些actors(如果主机有多个核,那么会使用多个线程以提高CPU利用率)。
这部分还有内容没有完成,参考书上第32.4小节