@eric1989
2017-03-24T04:14:39.000000Z
字数 7776
阅读 1326
LinkedTransferQueue采用的算法称之为双重队列。与以往的ConcurrentlinkedQueue不同,LinkedTransferQueue中的节点具备两种状态,即可存放一个数据,也可以存放一个请求。而ConcurrentlinkedQueue中节点单纯存储一个数据。
双重队列的算法简单描述如下
take请求(请求节点)和offer请求(数据节点)均可以将自身构建为一个节点放入队列。每次请求时均从队列头节点开始寻找,如果第一个有效节点是匹配的(如果是offer请求则匹配一个put节点,反而亦然),则会匹配该节点(被匹配过的节点则失效,不在参与计算)。如果队列为空,或者第一个有效节点与请求是相同的模式。则将请求包装为节点,放在最后一个模式相同的有效节点之后,或者失效的节点之后。
在这种算法下,队列中要么没有有效节点,要么有效节点都是相同的模式。
双重队列算法可以将请求也当成一个节点放入队列。因此可以不需要锁的情况下,并发实现阻塞提取功能,也就是take接口。
从上面的算法可以看到,双重队列实际是可以不要求更新头尾节点指针。但是如果真的这样做,每次找到有效节点都会带来巨大的性能开销。因此JDk采用了一种折中的方式,也就是jdk源码注释中提到的双重“松弛”队列。实现的角度来说就是头尾指针不需要实时更新,但是不能偏离正确的值太远。比如某个时刻,队列的快照如下。
正常情况下,头指针应该为空或者指向队列中第一个有效节点,而尾指针应该指向队列中最后一个有效节点。而从快照可以看到,两个指针都偏离了正确的值。不实时更新头尾指针可以省下更新的开销,但是对应的,会提高寻找有效节点的开销。为了平衡,jdk的算法中规定最大的偏离距离为2。超过2时代码就会强制更新对应指针。基本原理清晰之后来看代码实现
首先来看下队列中节点的定义
static final class Node {//表明节点是否是一个数据节点。false表明是请求节点final boolean isData;//如果是一个数据节点,则初始化时填入数据的值。//如果一个数据节点被匹配,线程会cas将item置换为null//如果一个请求节点被匹配。线程会cas将item从null设置为数据的值//如果一个节点因为超时或者线程中断需要取消。则会将item设置为节点自身。//一个阻塞请求节点被其他线程给予数据并且唤醒后,会将item设置为自身。//综上,当item被设置为节点自身时,意味着该节点已经失效volatile Object item;//当一个节点不再被head指针指向时,设置next为节点自身。这也意味着该节点已经脱离了队列volatile Node next;//当请求节点线程进入等待前,赋予该节点值volatile Thread waiter;}
LinkedTransferQueue中所有操作都浓缩在了一个方法之中。而这个方法通过定义个常量数字来区分不同的操作。定义如下:
private static final int NOW = 0; // for untimed poll, tryTransfer 非阻塞式提取private static final int ASYNC = 1; // for offer, put, add 由于是无界队列,因此这三个方法是等效的private static final int SYNC = 2; // for transfer, take 阻塞式提取private static final int TIMED = 3; // for timed poll, tryTransfer
我们重点关注poll和offer的逻辑。首先来所有方法的入口方法xfer
private E xfer(E e, boolean haveData, int how, long nanos) {if (haveData && (e == null))throw new NullPointerException();Node s = null; // the node to append, if neededretry:for (;;) {//首先从head指针指向的节点开始,寻找第一个有效节点并确认是否匹配。for (Node h = head, p = h; p != null;) {boolean isData = p.isData;Object item = p.item;//item!=p 意味着不是取消节点//(item != null) == isData 这个表达式很精巧。后文会看到更多类似的表达。这个表达式意味着节点本身没有被匹配掉。if (item != p && (item != null) == isData) {//确认节点模式是否相同。//如果节点模式相同,则跳出循环。按照算法特性,队列中的有效节点都是一个模式的。因此第一个相同,就不用尝试后续的。if (isData == haveData)break;//cas成功意味着匹配完成。如果本次是请求,则item原本是数据,e就为null反之则e是数据,item原本为nullif (p.casItem(item, e)) {//匹配成功之后检查下head指针是否偏离最近非匹配节点的距离是否大于2//对于for循环,启动条件只要匹配节点不是head节点就会尝试将head节点cas为本匹配节点的next节点或者本匹配节点当匹配节点没有next节点时for (Node q = p; q != h;) {Node n = q.next;if (head == h && casHead(h, n == null ? q : n)) {//cas head节点成功,则将之前的节点的next指针指向节点自身。这意味着节点脱离了队伍。同时,这也可以帮助gc。因为指向自身后,节点的不可达状态判断会更容易些。//当节点的next指针指向自身,意味着该节点已经脱离队伍,是一个无效节点。这个是一个判断的充分条件。h.forgetNext();break;}//cas 失败后则检查当前head指针距离下一个有效节点是否大于2.大于则再次循环,否则退出。头节点的松弛长度由这段代码决定。从代码上可以看出,松弛距离是2.if ((h = head) == null ||(q = h.next) == null || !q.isMatched())break;}LockSupport.unpark(p.waiter);return LinkedTransferQueue.<E>cast(item);}}//如果没有找到匹配节点就尝试下一个节点。如果该节点已经脱离队列,则从头开始。Node n = p.next;p = (p != n) ? n : (h = head); // Use head if p offlist}//没有找到匹配节点的情况下,并且不是无阻塞提取的操作的话,就构建一个节点,将自身节点放入队尾if (how != NOW) {if (s == null)s = new Node(e, haveData);//尝试将节点放入队尾。该方式是存在失败可能。此时需要重新开始完整流程。而s不需要初始化两次。保存作为临时变量即可。Node pred = tryAppend(s, haveData);if (pred == null)continue retry; // lost race vs opposite mode//入队成功后,如果是同步或者超时类操作。if (how != ASYNC)return awaitMatch(s, pred, e, (how == TIMED), nanos);}return e; // not waiting}}private Node tryAppend(Node s, boolean haveData){//从尾节点开始,准备插入自身节点for (Node t = tail, p = t;;){Node n, u;//如果队列为空,则尝试初始化if (p == null && (p = head) == null){if (casHead(null, s))return s;}//判断当前节点是否可以成为本节点的前置节点。判断标准为:当前节点未被匹配并且数据模式与本节点不同则无法匹配。其他情况可以匹配。else if (p.cannotPrecede(haveData))//如果两个不同属性的节点同时入队则会造成此种情况。cas失败的一方就会发现前置节点和自身数据模式不同。此时要退出本方法,从xfer的循环重新开始return null;//p节点并非最后的一个节点,则向后移动。由于是松弛队列,此种情况较为常见。else if ((n = p.next) != null)//这个表达式看的相当拗口。不清楚这么做是否能提高效率。欢迎熟悉jit优化的朋友出来说下。//将指针向后移动之前,为了保证效率。确认是否有必要检查tail指针已经有变化。如果有变化,则将p设置为tail节点,重新开始。否则的话就移动到p的后继节点。//只有当p节点和一开始的t节点不相等的时候才需要检查tail指针是否变化,否则直接移动到后继节点。//这个表达式做出的这些选择基本上是出于一种效率的选择。否则不停的向后移动指针也是正确的。但有时候会比较慢,特别是tail节点变化的情况下更意味着如此。p = p != t && t != (u = tail) ? (t = u) :(p != n) ? n : null;//p节点可以放入的话就尝试cas,如果失败就读取起next节点。重新执行流程。else if (!p.casNext(null, s))p = p.next;else{if (p != t){//又是这种一坨的代码。心好累。//首先看(tail != t || !casTail(t, s)) 。如果tail节点已经变化了,则尝试cas tail节点。如果失败,则判断当前的tail节点的松弛长度是否超过2.判断的代码就是后面的条件语句while ((tail != t || !casTail(t, s)) &&(t = tail) != null &&(s = t.next) != null && // advance and retry(s = s.next) != null && s != t);}return p;}}}//阻塞或者超时类操作private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {final long deadline = timed ? System.nanoTime() + nanos : 0L;Thread w = Thread.currentThread();int spins = -1;ThreadLocalRandom randomYields = null;for (;;) {Object item = s.item;//此类操作,e都是null。当item不为null的时候意味着有数据放入并且可能唤醒了对应的等待线程。这说明节点已经被匹配,获得了想要的数据。if (item != e) {//将item设置为自身节点,清空waiter数据。对gc友好。s.forgetContents();return LinkedTransferQueue.<E>cast(item);}//线程中断或者超时到达,通过cas操作让节点失效。if ((w.isInterrupted() || (timed && nanos <= 0)) &&s.casItem(e, s)) {//取消节点后,则尝试清理队列。这对于保持队列的高效,缩短无效路径有帮助。unsplice(pred, s);return e;}//spin是一个自旋策略。线程阻塞和唤醒是一个比较大的消耗操作。而在高并发竞争中,一般而言马上就可以获取到数据。因此在陷入阻塞之前进行自旋可以很有效的提高效率。//jdk采用的自旋规则是,如果一个节点是队列中第一个有效节点,则自旋N次。如果不是则自旋N/2次。jdk对N的取值是128.在自旋的过程中可能还会随机执行线程让渡操作。此外,如果本节点和前置节点的模式不同,意味着本节点的操作数量可能更多,此时自旋操作数量要更多,也就是3/2N的数量。if (spins < 0) {if ((spins = spinsFor(pred, s.isData)) > 0)randomYields = ThreadLocalRandom.current();}else if (spins > 0) { // spin--spins;if (randomYields.nextInt(CHAINED_SPINS) == 0)Thread.yield();}else if (s.waiter == null) {s.waiter = w;}else if (timed) {nanos = deadline - System.nanoTime();if (nanos > 0L)LockSupport.parkNanos(this, nanos);}else {LockSupport.park(this);}}}//队列清理final void unsplice(Node pred, Node s) {//将item设置为自身节点,清空waiter数据。对gc友好。s.forgetContents();//if判断确认pred节点确实存在并且仍然是s的前置节点。这是一切开始的前提if (pred != null && pred != s && pred.next == s) {Node n = s.next;//一般情况下就只是将前置节点的next设置为本节点的后置节点。让遍历路径中去掉该取消节点即可。//而如果发现该取消节点是最后一个节点或者前置节点已经无效了,就需要做进一步的清理动作。if (n == null ||(n != s && pred.casNext(s, n) && pred.isMatched())) {//首先是帮忙清理头结点,如果头节点已经失效的话for (;;) {Node h = head;if (h == pred || h == s || h == null)return; // at head or list emptyif (!h.isMatched())break;Node hn = h.next;if (hn == null)return; // now emptyif (hn != h && casHead(h, hn))h.forgetNext(); // advance head}//如果前置节点和本身节点都在队列中,与上面的条件结合,此时这两个节点都是无效节点。此时就对sweepVotes(清除投票计数)原子增。如果一个线程发现其触发了阀值。则执行全队列清除动作。if (pred.next != pred && s.next != s) {for (;;) {int v = sweepVotes;if (v < SWEEP_THRESHOLD) {if (casSweepVotes(v, v + 1))break;}else if (casSweepVotes(v, 0)) {sweep();break;}}}}}}//全队列清除方法。这个方法没太多说的,就是从头节点开始,检测每一个节点的有效性private void sweep() {for (Node p = head, s, n; p != null && (s = p.next) != null; ) {if (!s.isMatched())// Unmatched nodes are never self-linkedp = s;else if ((n = s.next) == null) // trailing node is pinnedbreak;else if (s == n) // stale// No need to also check for p == s, since that implies s == np = head;elsep.casNext(s, n);}}
是的没有错,在这个具体实现中存在着一个不是很明显的bug。该bug应该是被国人最早发现。并且提交的oracle了。但是在jdk8中仍然尚未修复。oracle的解释是难以复现,不懂是不是借口。因为根据算法实际上是很好复现的。下面来说下具体的bug步骤。
看过步骤,我们可以看下重现bug的代码。该段代码来自并发编程网。该位仁兄应该是最早发现这个bug的人。
只要在linkedtransferqueue中的tryAppend方法打断点,并且按照上面的步骤顺序让线程运行,bug就会重现。
public static void main(String[] args){final BlockingQueue<Long> queue = new LinkedTransferQueue<Long>();Runnable offerTask = new Runnable() {public void run(){queue.offer(8L);System.out.println("offerTask thread has gone!");}};Runnable takeTask = new Runnable() {public void run(){try{System.out.println(Thread.currentThread().getId() + " " + queue.take());}catch (InterruptedException e){}}};Runnable takeTaskInterrupted = new Runnable() {public void run(){Thread.currentThread().interrupt();try{System.out.println(Thread.currentThread().getId() + " " + queue.take());}catch (InterruptedException e){System.out.println(e + " " + Thread.currentThread().getId());}}};new Thread(offerTask,"offer").start();new Thread(takeTask,"take").start();new Thread(takeTaskInterrupted,"takeAndCancelImmediately").start();}