@eric1989
2017-03-24T12:14:39.000000Z
字数 7776
阅读 1082
LinkedTransferQueue采用的算法称之为双重队列。与以往的ConcurrentlinkedQueue不同,LinkedTransferQueue中的节点具备两种状态,即可存放一个数据,也可以存放一个请求。而ConcurrentlinkedQueue中节点单纯存储一个数据。
双重队列的算法简单描述如下
take请求(请求节点)和offer请求(数据节点)均可以将自身构建为一个节点放入队列。每次请求时均从队列头节点开始寻找,如果第一个有效节点是匹配的(如果是offer请求则匹配一个put节点,反而亦然),则会匹配该节点(被匹配过的节点则失效,不在参与计算)。如果队列为空,或者第一个有效节点与请求是相同的模式。则将请求包装为节点,放在最后一个模式相同的有效节点之后,或者失效的节点之后。
在这种算法下,队列中要么没有有效节点,要么有效节点都是相同的模式。
双重队列算法可以将请求也当成一个节点放入队列。因此可以不需要锁的情况下,并发实现阻塞提取功能,也就是take接口。
从上面的算法可以看到,双重队列实际是可以不要求更新头尾节点指针。但是如果真的这样做,每次找到有效节点都会带来巨大的性能开销。因此JDk采用了一种折中的方式,也就是jdk源码注释中提到的双重“松弛”队列。实现的角度来说就是头尾指针不需要实时更新,但是不能偏离正确的值太远。比如某个时刻,队列的快照如下。
正常情况下,头指针应该为空或者指向队列中第一个有效节点,而尾指针应该指向队列中最后一个有效节点。而从快照可以看到,两个指针都偏离了正确的值。不实时更新头尾指针可以省下更新的开销,但是对应的,会提高寻找有效节点的开销。为了平衡,jdk的算法中规定最大的偏离距离为2。超过2时代码就会强制更新对应指针。基本原理清晰之后来看代码实现
首先来看下队列中节点的定义
static final class Node {
//表明节点是否是一个数据节点。false表明是请求节点
final boolean isData;
//如果是一个数据节点,则初始化时填入数据的值。
//如果一个数据节点被匹配,线程会cas将item置换为null
//如果一个请求节点被匹配。线程会cas将item从null设置为数据的值
//如果一个节点因为超时或者线程中断需要取消。则会将item设置为节点自身。
//一个阻塞请求节点被其他线程给予数据并且唤醒后,会将item设置为自身。
//综上,当item被设置为节点自身时,意味着该节点已经失效
volatile Object item;
//当一个节点不再被head指针指向时,设置next为节点自身。这也意味着该节点已经脱离了队列
volatile Node next;
//当请求节点线程进入等待前,赋予该节点值
volatile Thread waiter;
}
LinkedTransferQueue中所有操作都浓缩在了一个方法之中。而这个方法通过定义个常量数字来区分不同的操作。定义如下:
private static final int NOW = 0; // for untimed poll, tryTransfer 非阻塞式提取
private static final int ASYNC = 1; // for offer, put, add 由于是无界队列,因此这三个方法是等效的
private static final int SYNC = 2; // for transfer, take 阻塞式提取
private static final int TIMED = 3; // for timed poll, tryTransfer
我们重点关注poll和offer的逻辑。首先来所有方法的入口方法xfer
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) {
//首先从head指针指向的节点开始,寻找第一个有效节点并确认是否匹配。
for (Node h = head, p = h; p != null;) {
boolean isData = p.isData;
Object item = p.item;
//item!=p 意味着不是取消节点
//(item != null) == isData 这个表达式很精巧。后文会看到更多类似的表达。这个表达式意味着节点本身没有被匹配掉。
if (item != p && (item != null) == isData) {
//确认节点模式是否相同。
//如果节点模式相同,则跳出循环。按照算法特性,队列中的有效节点都是一个模式的。因此第一个相同,就不用尝试后续的。
if (isData == haveData)
break;
//cas成功意味着匹配完成。如果本次是请求,则item原本是数据,e就为null反之则e是数据,item原本为null
if (p.casItem(item, e)) {
//匹配成功之后检查下head指针是否偏离最近非匹配节点的距离是否大于2
//对于for循环,启动条件只要匹配节点不是head节点就会尝试将head节点cas为本匹配节点的next节点或者本匹配节点当匹配节点没有next节点时
for (Node q = p; q != h;) {
Node n = q.next;
if (head == h && casHead(h, n == null ? q : n)) {
//cas head节点成功,则将之前的节点的next指针指向节点自身。这意味着节点脱离了队伍。同时,这也可以帮助gc。因为指向自身后,节点的不可达状态判断会更容易些。
//当节点的next指针指向自身,意味着该节点已经脱离队伍,是一个无效节点。这个是一个判断的充分条件。
h.forgetNext();
break;
}
//cas 失败后则检查当前head指针距离下一个有效节点是否大于2.大于则再次循环,否则退出。头节点的松弛长度由这段代码决定。从代码上可以看出,松弛距离是2.
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break;
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
//如果没有找到匹配节点就尝试下一个节点。如果该节点已经脱离队列,则从头开始。
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
//没有找到匹配节点的情况下,并且不是无阻塞提取的操作的话,就构建一个节点,将自身节点放入队尾
if (how != NOW) {
if (s == null)
s = new Node(e, haveData);
//尝试将节点放入队尾。该方式是存在失败可能。此时需要重新开始完整流程。而s不需要初始化两次。保存作为临时变量即可。
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
//入队成功后,如果是同步或者超时类操作。
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
private Node tryAppend(Node s, boolean haveData)
{
//从尾节点开始,准备插入自身节点
for (Node t = tail, p = t;;)
{
Node n, u;
//如果队列为空,则尝试初始化
if (p == null && (p = head) == null)
{
if (casHead(null, s))
return s;
}
//判断当前节点是否可以成为本节点的前置节点。判断标准为:当前节点未被匹配并且数据模式与本节点不同则无法匹配。其他情况可以匹配。
else if (p.cannotPrecede(haveData))
//如果两个不同属性的节点同时入队则会造成此种情况。cas失败的一方就会发现前置节点和自身数据模式不同。此时要退出本方法,从xfer的循环重新开始
return null;
//p节点并非最后的一个节点,则向后移动。由于是松弛队列,此种情况较为常见。
else if ((n = p.next) != null)
//这个表达式看的相当拗口。不清楚这么做是否能提高效率。欢迎熟悉jit优化的朋友出来说下。
//将指针向后移动之前,为了保证效率。确认是否有必要检查tail指针已经有变化。如果有变化,则将p设置为tail节点,重新开始。否则的话就移动到p的后继节点。
//只有当p节点和一开始的t节点不相等的时候才需要检查tail指针是否变化,否则直接移动到后继节点。
//这个表达式做出的这些选择基本上是出于一种效率的选择。否则不停的向后移动指针也是正确的。但有时候会比较慢,特别是tail节点变化的情况下更意味着如此。
p = p != t && t != (u = tail) ? (t = u) :
(p != n) ? n : null;
//p节点可以放入的话就尝试cas,如果失败就读取起next节点。重新执行流程。
else if (!p.casNext(null, s))
p = p.next;
else
{
if (p != t)
{
//又是这种一坨的代码。心好累。
//首先看(tail != t || !casTail(t, s)) 。如果tail节点已经变化了,则尝试cas tail节点。如果失败,则判断当前的tail节点的松弛长度是否超过2.判断的代码就是后面的条件语句
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
//阻塞或者超时类操作
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1;
ThreadLocalRandom randomYields = null;
for (;;) {
Object item = s.item;
//此类操作,e都是null。当item不为null的时候意味着有数据放入并且可能唤醒了对应的等待线程。这说明节点已经被匹配,获得了想要的数据。
if (item != e) {
//将item设置为自身节点,清空waiter数据。对gc友好。
s.forgetContents();
return LinkedTransferQueue.<E>cast(item);
}
//线程中断或者超时到达,通过cas操作让节点失效。
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) {
//取消节点后,则尝试清理队列。这对于保持队列的高效,缩短无效路径有帮助。
unsplice(pred, s);
return e;
}
//spin是一个自旋策略。线程阻塞和唤醒是一个比较大的消耗操作。而在高并发竞争中,一般而言马上就可以获取到数据。因此在陷入阻塞之前进行自旋可以很有效的提高效率。
//jdk采用的自旋规则是,如果一个节点是队列中第一个有效节点,则自旋N次。如果不是则自旋N/2次。jdk对N的取值是128.在自旋的过程中可能还会随机执行线程让渡操作。此外,如果本节点和前置节点的模式不同,意味着本节点的操作数量可能更多,此时自旋操作数量要更多,也就是3/2N的数量。
if (spins < 0) {
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield();
}
else if (s.waiter == null) {
s.waiter = w;
}
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
LockSupport.park(this);
}
}
}
//队列清理
final void unsplice(Node pred, Node s) {
//将item设置为自身节点,清空waiter数据。对gc友好。
s.forgetContents();
//if判断确认pred节点确实存在并且仍然是s的前置节点。这是一切开始的前提
if (pred != null && pred != s && pred.next == s) {
Node n = s.next;
//一般情况下就只是将前置节点的next设置为本节点的后置节点。让遍历路径中去掉该取消节点即可。
//而如果发现该取消节点是最后一个节点或者前置节点已经无效了,就需要做进一步的清理动作。
if (n == null ||(n != s && pred.casNext(s, n) && pred.isMatched())) {
//首先是帮忙清理头结点,如果头节点已经失效的话
for (;;) {
Node h = head;
if (h == pred || h == s || h == null)
return; // at head or list empty
if (!h.isMatched())
break;
Node hn = h.next;
if (hn == null)
return; // now empty
if (hn != h && casHead(h, hn))
h.forgetNext(); // advance head
}
//如果前置节点和本身节点都在队列中,与上面的条件结合,此时这两个节点都是无效节点。此时就对sweepVotes(清除投票计数)原子增。如果一个线程发现其触发了阀值。则执行全队列清除动作。
if (pred.next != pred && s.next != s) {
for (;;) {
int v = sweepVotes;
if (v < SWEEP_THRESHOLD) {
if (casSweepVotes(v, v + 1))
break;
}
else if (casSweepVotes(v, 0)) {
sweep();
break;
}
}
}
}
}
}
//全队列清除方法。这个方法没太多说的,就是从头节点开始,检测每一个节点的有效性
private void sweep() {
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
if (!s.isMatched())
// Unmatched nodes are never self-linked
p = s;
else if ((n = s.next) == null) // trailing node is pinned
break;
else if (s == n) // stale
// No need to also check for p == s, since that implies s == n
p = head;
else
p.casNext(s, n);
}
}
是的没有错,在这个具体实现中存在着一个不是很明显的bug。该bug应该是被国人最早发现。并且提交的oracle了。但是在jdk8中仍然尚未修复。oracle的解释是难以复现,不懂是不是借口。因为根据算法实际上是很好复现的。下面来说下具体的bug步骤。
看过步骤,我们可以看下重现bug的代码。该段代码来自并发编程网。该位仁兄应该是最早发现这个bug的人。
只要在linkedtransferqueue中的tryAppend方法打断点,并且按照上面的步骤顺序让线程运行,bug就会重现。
public static void main(String[] args)
{
final BlockingQueue<Long> queue = new LinkedTransferQueue<Long>();
Runnable offerTask = new Runnable() {
public void run()
{
queue.offer(8L);
System.out.println("offerTask thread has gone!");
}
};
Runnable takeTask = new Runnable() {
public void run()
{
try
{
System.out.println(Thread.currentThread().getId() + " " + queue.take());
}
catch (InterruptedException e)
{
}
}
};
Runnable takeTaskInterrupted = new Runnable() {
public void run()
{
Thread.currentThread().interrupt();
try
{
System.out.println(Thread.currentThread().getId() + " " + queue.take());
}
catch (InterruptedException e)
{
System.out.println(e + " " + Thread.currentThread().getId());
}
}
};
new Thread(offerTask,"offer").start();
new Thread(takeTask,"take").start();
new Thread(takeTaskInterrupted,"takeAndCancelImmediately").start();
}