[关闭]
@MiloXia 2016-01-28T15:16:28.000000Z 字数 2047 阅读 1679

STM

scala 并发


STM实现原理

Ref[T]

对var(共享变量)的lift, 提供了事务性的访问

  1. |- revision 当前value的版本
  2. Ref[T] -| get Context -> T 获取值
  3. |- := : T -> Context -> Unit 赋值
  1. class Ref[T](v: T) {
  2. private var contents_ = (v, 0) // atomically bind value to revision
  3. private var blocks = Set[AnyRef]()
  4. /**
  5. * Convenience constructor which simply utilizes the language
  6. * default for the given type instantiation.
  7. */
  8. def this() = this(null.asInstanceOf[T])
  9. /**
  10. * Alright, ballot time: who wants a unary_*(Context) method
  11. * as syntax sugar for this? Anyone? :-)
  12. */
  13. def get(implicit c: Context) = c retrieve this //委托给Context
  14. def :=(v: T)(implicit c: Transaction) {
  15. c.store(this)(v) //委托给Transaction
  16. }
  17. ...

Context

  1. |- Transaction 本地事务
  2. Context -|
  3. |- LiveContext 全局事务(object

Each transaction will have its own Transaction context which it will use in both read and write operations. LiveContext will only be used outside of a transaction, when there is no other context available.

Transaction

  1. |- rev 本事务的id
  2. Transaction -|- map: world Ref[Any] -> Any 保存对所有refs的修改/读取值(local cache 等待提交)
  3. |- set: writes [Ref[Any]] 保存对哪些refs有修改(写入)
  4. |- map: version Ref[Any] -> Int
commit
  1. def commit() = {
  2. CommitLock.synchronized {
  3. //检查所有ref的版本是否于当前事务版本(id)一致
  4. val back = world.foldLeft(true) { (success, tuple) =>
  5. val (ref, _) = tuple
  6. success && ref.contents._2 == version(ref)
  7. }
  8. //如果一致则提交更新,并修改refs的revision
  9. if (back) {
  10. for (ref <- writes) {
  11. ref.contents = (world(ref), rev)
  12. }
  13. }
  14. //返回成功失败,由外部决定是否要重试
  15. back
  16. }
  17. }

atomic

  1. def atomic[A](f: (Transaction)=>A): A = atomic(true)(f)
  2. def atomic[A](cond: =>Boolean)(f: (Transaction)=>A) = {
  3. def attemptTransact(): A = {
  4. if (cond) {
  5. val trans = new Transaction(rev)
  6. try {
  7. val result = f(trans)
  8. if (trans.commit()) result else attemptTransact()
  9. } catch {
  10. case _ => attemptTransact() // if exception, assume conflict and retry
  11. }
  12. } else null.asInstanceOf[A]
  13. }
  14. attemptTransact()
  15. }

源码:http://www.codecommit.com/blog/misc/software-transactional-memory-in-scala/scala_stm.zip

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注