[关闭]
@eric1989 2017-03-24T12:14:39.000000Z 字数 7776 阅读 1082

LinkedTransferQueue源码走读


算法导读

LinkedTransferQueue采用的算法称之为双重队列。与以往的ConcurrentlinkedQueue不同,LinkedTransferQueue中的节点具备两种状态,即可存放一个数据,也可以存放一个请求。而ConcurrentlinkedQueue中节点单纯存储一个数据。
双重队列的算法简单描述如下

take请求(请求节点)和offer请求(数据节点)均可以将自身构建为一个节点放入队列。每次请求时均从队列头节点开始寻找,如果第一个有效节点是匹配的(如果是offer请求则匹配一个put节点,反而亦然),则会匹配该节点(被匹配过的节点则失效,不在参与计算)。如果队列为空,或者第一个有效节点与请求是相同的模式。则将请求包装为节点,放在最后一个模式相同的有效节点之后,或者失效的节点之后。
在这种算法下,队列中要么没有有效节点,要么有效节点都是相同的模式。

双重队列算法可以将请求也当成一个节点放入队列。因此可以不需要锁的情况下,并发实现阻塞提取功能,也就是take接口。
从上面的算法可以看到,双重队列实际是可以不要求更新头尾节点指针。但是如果真的这样做,每次找到有效节点都会带来巨大的性能开销。因此JDk采用了一种折中的方式,也就是jdk源码注释中提到的双重“松弛”队列。实现的角度来说就是头尾指针不需要实时更新,但是不能偏离正确的值太远。比如某个时刻,队列的快照如下。
mark
正常情况下,头指针应该为空或者指向队列中第一个有效节点,而尾指针应该指向队列中最后一个有效节点。而从快照可以看到,两个指针都偏离了正确的值。不实时更新头尾指针可以省下更新的开销,但是对应的,会提高寻找有效节点的开销。为了平衡,jdk的算法中规定最大的偏离距离为2。超过2时代码就会强制更新对应指针。基本原理清晰之后来看代码实现

源码走读

首先来看下队列中节点的定义

  1. static final class Node {
  2. //表明节点是否是一个数据节点。false表明是请求节点
  3. final boolean isData;
  4. //如果是一个数据节点,则初始化时填入数据的值。
  5. //如果一个数据节点被匹配,线程会cas将item置换为null
  6. //如果一个请求节点被匹配。线程会cas将item从null设置为数据的值
  7. //如果一个节点因为超时或者线程中断需要取消。则会将item设置为节点自身。
  8. //一个阻塞请求节点被其他线程给予数据并且唤醒后,会将item设置为自身。
  9. //综上,当item被设置为节点自身时,意味着该节点已经失效
  10. volatile Object item;
  11. //当一个节点不再被head指针指向时,设置next为节点自身。这也意味着该节点已经脱离了队列
  12. volatile Node next;
  13. //当请求节点线程进入等待前,赋予该节点值
  14. volatile Thread waiter;
  15. }

LinkedTransferQueue中所有操作都浓缩在了一个方法之中。而这个方法通过定义个常量数字来区分不同的操作。定义如下:

  1. private static final int NOW = 0; // for untimed poll, tryTransfer 非阻塞式提取
  2. private static final int ASYNC = 1; // for offer, put, add 由于是无界队列,因此这三个方法是等效的
  3. private static final int SYNC = 2; // for transfer, take 阻塞式提取
  4. private static final int TIMED = 3; // for timed poll, tryTransfer

我们重点关注poll和offer的逻辑。首先来所有方法的入口方法xfer

  1. private E xfer(E e, boolean haveData, int how, long nanos) {
  2. if (haveData && (e == null))
  3. throw new NullPointerException();
  4. Node s = null; // the node to append, if needed
  5. retry:
  6. for (;;) {
  7. //首先从head指针指向的节点开始,寻找第一个有效节点并确认是否匹配。
  8. for (Node h = head, p = h; p != null;) {
  9. boolean isData = p.isData;
  10. Object item = p.item;
  11. //item!=p 意味着不是取消节点
  12. //(item != null) == isData 这个表达式很精巧。后文会看到更多类似的表达。这个表达式意味着节点本身没有被匹配掉。
  13. if (item != p && (item != null) == isData) {
  14. //确认节点模式是否相同。
  15. //如果节点模式相同,则跳出循环。按照算法特性,队列中的有效节点都是一个模式的。因此第一个相同,就不用尝试后续的。
  16. if (isData == haveData)
  17. break;
  18. //cas成功意味着匹配完成。如果本次是请求,则item原本是数据,e就为null反之则e是数据,item原本为null
  19. if (p.casItem(item, e)) {
  20. //匹配成功之后检查下head指针是否偏离最近非匹配节点的距离是否大于2
  21. //对于for循环,启动条件只要匹配节点不是head节点就会尝试将head节点cas为本匹配节点的next节点或者本匹配节点当匹配节点没有next节点时
  22. for (Node q = p; q != h;) {
  23. Node n = q.next;
  24. if (head == h && casHead(h, n == null ? q : n)) {
  25. //cas head节点成功,则将之前的节点的next指针指向节点自身。这意味着节点脱离了队伍。同时,这也可以帮助gc。因为指向自身后,节点的不可达状态判断会更容易些。
  26. //当节点的next指针指向自身,意味着该节点已经脱离队伍,是一个无效节点。这个是一个判断的充分条件。
  27. h.forgetNext();
  28. break;
  29. }
  30. //cas 失败后则检查当前head指针距离下一个有效节点是否大于2.大于则再次循环,否则退出。头节点的松弛长度由这段代码决定。从代码上可以看出,松弛距离是2.
  31. if ((h = head) == null ||
  32. (q = h.next) == null || !q.isMatched())
  33. break;
  34. }
  35. LockSupport.unpark(p.waiter);
  36. return LinkedTransferQueue.<E>cast(item);
  37. }
  38. }
  39. //如果没有找到匹配节点就尝试下一个节点。如果该节点已经脱离队列,则从头开始。
  40. Node n = p.next;
  41. p = (p != n) ? n : (h = head); // Use head if p offlist
  42. }
  43. //没有找到匹配节点的情况下,并且不是无阻塞提取的操作的话,就构建一个节点,将自身节点放入队尾
  44. if (how != NOW) {
  45. if (s == null)
  46. s = new Node(e, haveData);
  47. //尝试将节点放入队尾。该方式是存在失败可能。此时需要重新开始完整流程。而s不需要初始化两次。保存作为临时变量即可。
  48. Node pred = tryAppend(s, haveData);
  49. if (pred == null)
  50. continue retry; // lost race vs opposite mode
  51. //入队成功后,如果是同步或者超时类操作。
  52. if (how != ASYNC)
  53. return awaitMatch(s, pred, e, (how == TIMED), nanos);
  54. }
  55. return e; // not waiting
  56. }
  57. }
  58. private Node tryAppend(Node s, boolean haveData)
  59. {
  60. //从尾节点开始,准备插入自身节点
  61. for (Node t = tail, p = t;;)
  62. {
  63. Node n, u;
  64. //如果队列为空,则尝试初始化
  65. if (p == null && (p = head) == null)
  66. {
  67. if (casHead(null, s))
  68. return s;
  69. }
  70. //判断当前节点是否可以成为本节点的前置节点。判断标准为:当前节点未被匹配并且数据模式与本节点不同则无法匹配。其他情况可以匹配。
  71. else if (p.cannotPrecede(haveData))
  72. //如果两个不同属性的节点同时入队则会造成此种情况。cas失败的一方就会发现前置节点和自身数据模式不同。此时要退出本方法,从xfer的循环重新开始
  73. return null;
  74. //p节点并非最后的一个节点,则向后移动。由于是松弛队列,此种情况较为常见。
  75. else if ((n = p.next) != null)
  76. //这个表达式看的相当拗口。不清楚这么做是否能提高效率。欢迎熟悉jit优化的朋友出来说下。
  77. //将指针向后移动之前,为了保证效率。确认是否有必要检查tail指针已经有变化。如果有变化,则将p设置为tail节点,重新开始。否则的话就移动到p的后继节点。
  78. //只有当p节点和一开始的t节点不相等的时候才需要检查tail指针是否变化,否则直接移动到后继节点。
  79. //这个表达式做出的这些选择基本上是出于一种效率的选择。否则不停的向后移动指针也是正确的。但有时候会比较慢,特别是tail节点变化的情况下更意味着如此。
  80. p = p != t && t != (u = tail) ? (t = u) :
  81. (p != n) ? n : null;
  82. //p节点可以放入的话就尝试cas,如果失败就读取起next节点。重新执行流程。
  83. else if (!p.casNext(null, s))
  84. p = p.next;
  85. else
  86. {
  87. if (p != t)
  88. {
  89. //又是这种一坨的代码。心好累。
  90. //首先看(tail != t || !casTail(t, s)) 。如果tail节点已经变化了,则尝试cas tail节点。如果失败,则判断当前的tail节点的松弛长度是否超过2.判断的代码就是后面的条件语句
  91. while ((tail != t || !casTail(t, s)) &&
  92. (t = tail) != null &&
  93. (s = t.next) != null && // advance and retry
  94. (s = s.next) != null && s != t);
  95. }
  96. return p;
  97. }
  98. }
  99. }
  100. //阻塞或者超时类操作
  101. private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
  102. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  103. Thread w = Thread.currentThread();
  104. int spins = -1;
  105. ThreadLocalRandom randomYields = null;
  106. for (;;) {
  107. Object item = s.item;
  108. //此类操作,e都是null。当item不为null的时候意味着有数据放入并且可能唤醒了对应的等待线程。这说明节点已经被匹配,获得了想要的数据。
  109. if (item != e) {
  110. //将item设置为自身节点,清空waiter数据。对gc友好。
  111. s.forgetContents();
  112. return LinkedTransferQueue.<E>cast(item);
  113. }
  114. //线程中断或者超时到达,通过cas操作让节点失效。
  115. if ((w.isInterrupted() || (timed && nanos <= 0)) &&
  116. s.casItem(e, s)) {
  117. //取消节点后,则尝试清理队列。这对于保持队列的高效,缩短无效路径有帮助。
  118. unsplice(pred, s);
  119. return e;
  120. }
  121. //spin是一个自旋策略。线程阻塞和唤醒是一个比较大的消耗操作。而在高并发竞争中,一般而言马上就可以获取到数据。因此在陷入阻塞之前进行自旋可以很有效的提高效率。
  122. //jdk采用的自旋规则是,如果一个节点是队列中第一个有效节点,则自旋N次。如果不是则自旋N/2次。jdk对N的取值是128.在自旋的过程中可能还会随机执行线程让渡操作。此外,如果本节点和前置节点的模式不同,意味着本节点的操作数量可能更多,此时自旋操作数量要更多,也就是3/2N的数量。
  123. if (spins < 0) {
  124. if ((spins = spinsFor(pred, s.isData)) > 0)
  125. randomYields = ThreadLocalRandom.current();
  126. }
  127. else if (spins > 0) { // spin
  128. --spins;
  129. if (randomYields.nextInt(CHAINED_SPINS) == 0)
  130. Thread.yield();
  131. }
  132. else if (s.waiter == null) {
  133. s.waiter = w;
  134. }
  135. else if (timed) {
  136. nanos = deadline - System.nanoTime();
  137. if (nanos > 0L)
  138. LockSupport.parkNanos(this, nanos);
  139. }
  140. else {
  141. LockSupport.park(this);
  142. }
  143. }
  144. }
  145. //队列清理
  146. final void unsplice(Node pred, Node s) {
  147. //将item设置为自身节点,清空waiter数据。对gc友好。
  148. s.forgetContents();
  149. //if判断确认pred节点确实存在并且仍然是s的前置节点。这是一切开始的前提
  150. if (pred != null && pred != s && pred.next == s) {
  151. Node n = s.next;
  152. //一般情况下就只是将前置节点的next设置为本节点的后置节点。让遍历路径中去掉该取消节点即可。
  153. //而如果发现该取消节点是最后一个节点或者前置节点已经无效了,就需要做进一步的清理动作。
  154. if (n == null ||(n != s && pred.casNext(s, n) && pred.isMatched())) {
  155. //首先是帮忙清理头结点,如果头节点已经失效的话
  156. for (;;) {
  157. Node h = head;
  158. if (h == pred || h == s || h == null)
  159. return; // at head or list empty
  160. if (!h.isMatched())
  161. break;
  162. Node hn = h.next;
  163. if (hn == null)
  164. return; // now empty
  165. if (hn != h && casHead(h, hn))
  166. h.forgetNext(); // advance head
  167. }
  168. //如果前置节点和本身节点都在队列中,与上面的条件结合,此时这两个节点都是无效节点。此时就对sweepVotes(清除投票计数)原子增。如果一个线程发现其触发了阀值。则执行全队列清除动作。
  169. if (pred.next != pred && s.next != s) {
  170. for (;;) {
  171. int v = sweepVotes;
  172. if (v < SWEEP_THRESHOLD) {
  173. if (casSweepVotes(v, v + 1))
  174. break;
  175. }
  176. else if (casSweepVotes(v, 0)) {
  177. sweep();
  178. break;
  179. }
  180. }
  181. }
  182. }
  183. }
  184. }
  185. //全队列清除方法。这个方法没太多说的,就是从头节点开始,检测每一个节点的有效性
  186. private void sweep() {
  187. for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
  188. if (!s.isMatched())
  189. // Unmatched nodes are never self-linked
  190. p = s;
  191. else if ((n = s.next) == null) // trailing node is pinned
  192. break;
  193. else if (s == n) // stale
  194. // No need to also check for p == s, since that implies s == n
  195. p = head;
  196. else
  197. p.casNext(s, n);
  198. }
  199. }

实现bug

是的没有错,在这个具体实现中存在着一个不是很明显的bug。该bug应该是被国人最早发现。并且提交的oracle了。但是在jdk8中仍然尚未修复。oracle的解释是难以复现,不懂是不是借口。因为根据算法实际上是很好复现的。下面来说下具体的bug步骤。

  1. 假设3个线程offer(放入数据),takeAndCancelImmediately(提取数据但是线程本身被中断,或者说超时时间很短),take(阻塞式提取)分别尝试操作队列。
  2. 由于队列为空,则三个线程都构造了自身节点并且将自身节点尝试放入队列。
  3. 假设放入的顺序是take,takeAndCancelImmediately,offer。由于takeAndCancelImmediately立刻超时导致节点取消,因此offer是可以跟在takeAndCancelImmediately的节点之后而不会导致重新发起尝试。这里不会发起重试是导致bug的直接原因。
  4. 经过上面三个步骤此时bug形成。队列中第一个节点是阻塞式数据获取,第二个节点是一个无效节点,第三个节点是一个数据节点。那么线程take无法得到线程offer的数据。
  5. 如果此时再来一个take类的线程,会导致更加严重的问题。因为队列头结点也是take类的,因此线程会到队尾尝试入队,而发现队尾的节点是数据模式节点,又会从头重试。重而导致该线程死循环,将cpu打满。

看过步骤,我们可以看下重现bug的代码。该段代码来自并发编程网。该位仁兄应该是最早发现这个bug的人。
只要在linkedtransferqueue中的tryAppend方法打断点,并且按照上面的步骤顺序让线程运行,bug就会重现。

  1. public static void main(String[] args)
  2. {
  3. final BlockingQueue<Long> queue = new LinkedTransferQueue<Long>();
  4. Runnable offerTask = new Runnable() {
  5. public void run()
  6. {
  7. queue.offer(8L);
  8. System.out.println("offerTask thread has gone!");
  9. }
  10. };
  11. Runnable takeTask = new Runnable() {
  12. public void run()
  13. {
  14. try
  15. {
  16. System.out.println(Thread.currentThread().getId() + " " + queue.take());
  17. }
  18. catch (InterruptedException e)
  19. {
  20. }
  21. }
  22. };
  23. Runnable takeTaskInterrupted = new Runnable() {
  24. public void run()
  25. {
  26. Thread.currentThread().interrupt();
  27. try
  28. {
  29. System.out.println(Thread.currentThread().getId() + " " + queue.take());
  30. }
  31. catch (InterruptedException e)
  32. {
  33. System.out.println(e + " " + Thread.currentThread().getId());
  34. }
  35. }
  36. };
  37. new Thread(offerTask,"offer").start();
  38. new Thread(takeTask,"take").start();
  39. new Thread(takeTaskInterrupted,"takeAndCancelImmediately").start();
  40. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注