@MiloXia
2016-01-28T15:16:28.000000Z
字数 2047
阅读 1679
scala
并发
对var(共享变量)的lift, 提供了事务性的访问
|- revision 当前value的版本
Ref[T] -| get: Context -> T 获取值
|- := : T -> Context -> Unit 赋值
class Ref[T](v: T) {
private var contents_ = (v, 0) // atomically bind value to revision
private var blocks = Set[AnyRef]()
/**
* Convenience constructor which simply utilizes the language
* default for the given type instantiation.
*/
def this() = this(null.asInstanceOf[T])
/**
* Alright, ballot time: who wants a unary_*(Context) method
* as syntax sugar for this? Anyone? :-)
*/
def get(implicit c: Context) = c retrieve this //委托给Context
def :=(v: T)(implicit c: Transaction) {
c.store(this)(v) //委托给Transaction
}
...
|- Transaction 本地事务
Context -|
|- LiveContext 全局事务(object)
Each transaction will have its own Transaction context which it will use in both read and write operations. LiveContext will only be used outside of a transaction, when there is no other context available.
|- rev 本事务的id
Transaction -|- map: world Ref[Any] -> Any 保存对所有refs的修改/读取值(local cache 等待提交)
|- set: writes [Ref[Any]] 保存对哪些refs有修改(写入)
|- map: version Ref[Any] -> Int
def commit() = {
CommitLock.synchronized {
//检查所有ref的版本是否于当前事务版本(id)一致
val back = world.foldLeft(true) { (success, tuple) =>
val (ref, _) = tuple
success && ref.contents._2 == version(ref)
}
//如果一致则提交更新,并修改refs的revision
if (back) {
for (ref <- writes) {
ref.contents = (world(ref), rev)
}
}
//返回成功失败,由外部决定是否要重试
back
}
}
def atomic[A](f: (Transaction)=>A): A = atomic(true)(f)
def atomic[A](cond: =>Boolean)(f: (Transaction)=>A) = {
def attemptTransact(): A = {
if (cond) {
val trans = new Transaction(rev)
try {
val result = f(trans)
if (trans.commit()) result else attemptTransact()
} catch {
case _ => attemptTransact() // if exception, assume conflict and retry
}
} else null.asInstanceOf[A]
}
attemptTransact()
}
源码:http://www.codecommit.com/blog/misc/software-transactional-memory-in-scala/scala_stm.zip