[关闭]
@kiraSally 2018-03-12T19:04:10.000000Z 字数 18279 阅读 2815

并发番@ConditionObject一文通

Java 1.8版本 并发


  • 鉴于JDK文档的英文注释过多,因此笔者选择摘录部分笔者认为最重要的部分,其他内容会以中文注释展现
  • 笔者个人博客 kiraSally的掘金个人博客 感谢支持

1.Condition接口

Condition接口意义在于提供一种针对使用条件状态(队列或变量)的线程进行阻塞和唤醒的机制 :

阻塞: 阻塞线程进入等待状态同时释放关联锁,直到被其他线程将条件状态设置为true为止

唤醒: 唤醒等待在条件队列中的线程,同时提供对唤醒全部的支持

锁关联: 值得注意的是,由于多线程下该状态是共享的,因此通常会通过于锁关联保证其原子性

解除等待: 当线程解除等待状态(被唤醒、中断、超时等)后仍需重新竞争锁,获取锁后才能够从暂停位置开始继续往后执行

  1. /*
  2. * Conditions (also known as `condition queues` or `condition variables`)
  3. * provide a means for one thread to suspend execution to wait
  4. * until notified by another thread that some state condition may now be true.
  5. *
  6. * Conditions(如条件队列或条件变量)的作用如下:
  7. * 暂停线程并进入等待状态,直到被其他线程将条件状态设置为true为止
  8. * 值得注意的是:由于多线程下该状态是共享的,因此通常会通过于锁关联保证其原子性
  9. * 如Lock#newCondition newCondition()方法
  10. */
  11. public interface Condition {
  12. /**
  13. * Causes the current thread to wait until it is signalled or {Thread#interrupt interrupted}
  14. * 线程等待直到被唤醒或被中断
  15. * 阻塞方法的实现有几个准则:
  16. * 1.当前线程持有的锁(关联该条件变量)将被释放
  17. * 2.当线程解除等待状态后仍需重新竞争锁,获取锁后才能够从当前位置开始继续往后执行
  18. */
  19. void await() throws InterruptedException;
  20. /**
  21. * Causes the current thread to wait until it is signalled.
  22. * 线程等待直到被唤醒
  23. */
  24. void awaitUninterruptibly();
  25. /**
  26. * Causes the current thread to wait until it is signalled or interrupted,
  27. * or the specified waiting time elapses.
  28. * 线程等待直到被唤醒或被中断或超时
  29. */
  30. long awaitNanos(long nanosTimeout) throws InterruptedException;
  31. /**
  32. * Causes the current thread to wait until it is signalled or interrupted,
  33. * or the specified waiting time elapses. This method is behaviorally
  34. * equivalent to: {@code awaitNanos(unit.toNanos(time)) > 0}
  35. * 线程等待直到被唤醒或被中断或超时
  36. * 该方法等同于 awaitNanos(unit.toNanos(time)) > 0
  37. */
  38. boolean await(long time, TimeUnit unit) throws InterruptedException;
  39. /**
  40. * Causes the current thread to wait until it is signalled or interrupted,
  41. * or the specified deadline elapses.
  42. * 线程等待直到被唤醒或被中断或超时
  43. */
  44. boolean awaitUntil(Date deadline) throws InterruptedException;
  45. /**
  46. * Wakes up one waiting thread.
  47. * 唤醒在条件队列中等待的一个线程
  48. * 注意:线程被唤醒后仍需重新竞争锁,获取锁后才能够从当前位置开始继续往后执行
  49. */
  50. void signal();
  51. /**
  52. * Wakes up all waiting threads.
  53. * 唤醒在条件队列中等待的所有线程
  54. */
  55. void signalAll();
  56. }

2.ConditionObject综述

ConditionObject包括如下方面:

定位: ConditionObject是AQS的内部类,其通过实现Condition接口提供对管程形式的条件的支持

作用: ConditionObject在AQS中作为对Lock的实现支持

使用: 当条件不满足时线程入队,当条件满足时出队并重新尝试获取锁,获取成功后从暂停位置开始继续往后执行

3.ConditionObject实现原理

ConditionObject的实现有几个注意事项:

1.内部会维护一个条件等待同步队列,根据FIFO原则执行入队出队操作

2.通过firstWaiter维护头节点,lastWaiter维护尾节点

3.节点通过nextWaiter记录后继条件节点形成链表结构,遍历时从头节点开始沿着nextWaiter顺序遍历

4.条件队列中节点状态waitStatus只能为0或Node.CONDITION

5.使用条件队列的前提是线程需要持有同步锁,且只支持独占模式

6.当节点在条件队列被唤醒(因signal|timeout|interrupt)后,需要进行节点转移,即由条件节点转变为同步节点

4.ConditionObject组成

4.1 类定义

  1. public class ConditionObject implements Condition, java.io.Serializable

4.2 构造器

  1. public ConditionObject() { }

4.3 重要变量

  1. /** 条件等待队列首节点 */
  2. private transient Node firstWaiter;
  3. /** 条件等待队列尾节点 */
  4. private transient Node lastWaiter;
  5. /** 退出时重新中断*/
  6. private static final int REINTERRUPT = 1;
  7. /** 退出时直接抛出异常 */
  8. private static final int THROW_IE = -1;
  9. /** 独占模式时才能使用条件队列 ,用于链接下一个等待节点 */
  10. Node nextWaiter;

5.条件节点阻塞

5.1 不响应中断阻塞

条件节点阻塞的一般流程:

1.入队: 根据FIFO原则,新节点会被封装成Node并被加入到条件队列队尾

2.释放: 为不影响其他线程,当前已持有锁的线程需要先释放所有锁,让出锁资源

3.阻塞: 在条件队列中的节点线程需要被阻塞,直到条件满足后重试获取同步锁

4.重试: 当条件满足后需要重试获取同步锁,获取成功后才能继续从暂停位置继续向后执行

补充:条件满足指的是线程被其他线程唤醒、或超时、或中断后且位于同步队列中

  1. public final void awaitUninterruptibly() {
  2. //1.新节点入条件队列
  3. Node node = addConditionWaiter();
  4. //2.当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放锁
  5. int savedState = fullyRelease(node);
  6. //3.阻塞在条件队列中的不满足条件的节点线程
  7. boolean interrupted = false;
  8. while (!isOnSyncQueue(node)) {
  9. LockSupport.park(this);
  10. //由于不响应中断,因此只是记录是否中断,此时会清除中断标识
  11. if (Thread.interrupted())
  12. interrupted = true;
  13. }
  14. /**
  15. * 4.被唤醒需要重新尝试获取锁,获取锁成功后才能继续往后执行
  16. * 若期间又被中断后,需要再次设置已被清除的中断标识
  17. */
  18. if (acquireQueued(node, savedState) || interrupted)
  19. selfInterrupt();
  20. }

小问:为神马需要循环判断isOnSyncQueue? 即while (!isOnSyncQueue(node)) ?
友情小提示:读者可以回顾一下同步队列获取锁的过程

小答:循环只有一个目的,就是确保节点已经脱离条件队列且进入同步队列,这样才有资格重新获取同步锁


5.2 响应中断阻塞

  1. public final void await() throws InterruptedException {
  2. //响应中断,直接抛异常,没必要往下走了
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. //新节点入条件队列
  6. Node node = addConditionWaiter();
  7. //当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放锁
  8. int savedState = fullyRelease(node);
  9. //只阻塞在条件队列中的节点线程,同步节点才能竞争同步锁
  10. int interruptMode = 0;
  11. while (!isOnSyncQueue(node)) {
  12. LockSupport.park(this);
  13. /**
  14. * 检测中断,一旦发生中断
  15. * 1.将条件队列中因中断而唤醒的节点进行转移(注意此处是中断)
  16. * 2.退出循环 -> 接下来会在循环外进行中断处理
  17. * 注意:之所以安心退出是因为会通过执行transferAfterCancelledWait进行节点转移
  18. * 这样随后就能安心执行acquireQueued同步队列入队操作了
  19. */
  20. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  21. break;
  22. }
  23. /**
  24. * 重新尝试获取同步锁,获取成功后且被中断,当中断模式为抛出异常时,需要设置为重新中断
  25. * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
  26. */
  27. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  28. interruptMode = REINTERRUPT;
  29. /**
  30. * 若当前节点存在后继节点时,需要执行出队操作
  31. * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
  32. */
  33. if (node.nextWaiter != null) // clean up if cancelled
  34. unlinkCancelledWaiters();
  35. //interruptMode != 0 说明是需要进行中断处理的
  36. if (interruptMode != 0)
  37. //执行中断处理
  38. reportInterruptAfterWait(interruptMode);
  39. }
  1. /**
  2. * 中断处理:
  3. * 1.中断模式为THROW_IE:向上抛出异常
  4. * 2.中断模式为REINTERRUPT:重新设置中断状态
  5. */
  6. private void reportInterruptAfterWait(int interruptMode)
  7. throws InterruptedException {
  8. if (interruptMode == THROW_IE)
  9. throw new InterruptedException();
  10. else if (interruptMode == REINTERRUPT)
  11. selfInterrupt();
  12. }

5.3 响应中断和超时阻塞-纳秒可选

  1. /**
  2. * Implements timed condition wait.
  3. * 响应中断和超时阻塞
  4. * @return long 剩余超时时间 <0 说明已超时
  5. */
  6. public final long awaitNanos(long nanosTimeout)
  7. throws InterruptedException {
  8. //响应中断,直接抛异常,没必要往下走了
  9. if (Thread.interrupted())
  10. throw new InterruptedException();
  11. //新节点入条件队列
  12. Node node = addConditionWaiter();
  13. //当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放同步锁
  14. int savedState = fullyRelease(node);
  15. //截止时长
  16. final long deadline = System.nanoTime() + nanosTimeout;
  17. //只阻塞在条件队列中的节点线程,同步节点才能竞争同步锁
  18. int interruptMode = 0;
  19. while (!isOnSyncQueue(node)) {
  20. /**
  21. * 将条件队列中因超时而唤醒的节点进行转移(注意此处是超时)
  22. * 值得注意的是:剩余超时时间允许为0和负数
  23. */
  24. if (nanosTimeout <= 0L) {
  25. transferAfterCancelledWait(node);
  26. break;
  27. }
  28. //为了提高效率,超过阈值才执行阻塞,否则接着自旋
  29. if (nanosTimeout >= spinForTimeoutThreshold)
  30. //若超过nanosTimeout时长,会自动解除阻塞唤醒线程
  31. LockSupport.parkNanos(this, nanosTimeout);
  32. /**
  33. * 检测中断,一旦发生中断
  34. * 1.将条件队列中因中断而唤醒的节点进行转移(注意此处是中断)
  35. * 2.退出循环 -> 接下来会在循环外进行中断处理
  36. * 注意:之所以安心退出是因为会通过执行transferAfterCancelledWait进行节点转移
  37. * 这样随后就能安心执行acquireQueued同步队列入队操作了
  38. */
  39. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  40. break;
  41. //剩余超时时间
  42. nanosTimeout = deadline - System.nanoTime();
  43. }
  44. /**
  45. * 重新尝试获取同步锁,获取成功后且被中断,当中断模式为抛出异常时,需要设置为重新中断
  46. * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
  47. */
  48. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  49. interruptMode = REINTERRUPT;
  50. /**
  51. * 若当前节点存在后继节点时,需要执行出队操作
  52. * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
  53. */
  54. if (node.nextWaiter != null)
  55. unlinkCancelledWaiters();
  56. //interruptMode != 0 说明是需要进行中断处理的
  57. if (interruptMode != 0)
  58. //执行中断处理
  59. reportInterruptAfterWait(interruptMode);
  60. //返回已花费时长
  61. return deadline - System.nanoTime();
  62. }

5.4 响应中断和超时阻塞-纳秒日期单位可选

  1. /**
  2. * Implements timed condition wait.
  3. * 响应中断和超时阻塞-纳秒日期单位可选
  4. * @return boolean 超时是否发生在被唤醒之前
  5. */
  6. public final boolean await(long time, TimeUnit unit)
  7. throws InterruptedException {
  8. //注意是纳秒
  9. long nanosTimeout = unit.toNanos(time);
  10. //响应中断,直接抛异常,没必要往下走了
  11. if (Thread.interrupted())
  12. throw new InterruptedException();
  13. //新节点入条件队列
  14. Node node = addConditionWaiter();
  15. //当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放同步锁
  16. int savedState = fullyRelease(node);
  17. //截止时长
  18. final long deadline = System.nanoTime() + nanosTimeout;
  19. //只阻塞在条件队列中的节点线程,同步节点才能竞争同步锁
  20. boolean timedout = false;
  21. int interruptMode = 0;
  22. while (!isOnSyncQueue(node)) {
  23. /**
  24. * 将条件队列中因超时而唤醒的节点进行转移(注意此处是超时)
  25. * 值得注意的是:剩余超时时间允许为0和负数
  26. */
  27. if (nanosTimeout <= 0L) {
  28. timedout = transferAfterCancelledWait(node);
  29. break;
  30. }
  31. //为了提高效率,超过阈值才执行阻塞,否则接着自旋
  32. if (nanosTimeout >= spinForTimeoutThreshold)
  33. //若超过nanosTimeout时长,会自动解除阻塞唤醒线程
  34. LockSupport.parkNanos(this, nanosTimeout);
  35. /**
  36. * 检测中断,一旦发生中断
  37. * 1.将条件队列中因中断而唤醒的节点进行转移(注意此处是中断)
  38. * 2.退出循环 -> 接下来会在循环外进行中断处理
  39. * 注意:之所以安心退出是因为会通过执行transferAfterCancelledWait进行节点转移
  40. * 这样随后就能安心执行acquireQueued同步队列入队操作了
  41. */
  42. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  43. break;
  44. //剩余超时时间
  45. nanosTimeout = deadline - System.nanoTime();
  46. }
  47. /**
  48. * 重新尝试获取同步锁,获取成功后且被中断,当中断模式为抛出异常时,需要设置为重新中断
  49. * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
  50. */
  51. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  52. interruptMode = REINTERRUPT;
  53. /**
  54. * 若当前节点存在后继节点时,需要执行出队操作
  55. * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
  56. */
  57. if (node.nextWaiter != null)
  58. unlinkCancelledWaiters();
  59. //interruptMode != 0 说明是需要进行中断处理的
  60. if (interruptMode != 0)
  61. //执行中断处理
  62. reportInterruptAfterWait(interruptMode);
  63. return !timedout;
  64. }

5.5 响应中断和超时阻塞-毫秒日期可选

  1. /**
  2. * Implements absolute timed condition wait.
  3. * 响应中断和超时阻塞-毫秒日期可选
  4. * @return boolean 超时是否发生在被唤醒之前
  5. */
  6. public final boolean awaitUntil(Date deadline)
  7. throws InterruptedException {
  8. //注意:与awaitNano区别的是这里用的是毫秒
  9. long abstime = deadline.getTime();
  10. //响应中断,直接抛异常,没必要往下走了
  11. if (Thread.interrupted())
  12. throw new InterruptedException();
  13. //新节点入条件队列
  14. Node node = addConditionWaiter();
  15. //当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放同步锁
  16. int savedState = fullyRelease(node);
  17. //记录节点是否超时
  18. boolean timedout = false;
  19. //只阻塞在条件队列中的节点线程,同步节点才能竞争同步锁
  20. int interruptMode = 0;
  21. while (!isOnSyncQueue(node)) {
  22. //毫秒超时,将条件队列中因超时而唤醒的节点进行转移(注意此处是超时)
  23. if (System.currentTimeMillis() > abstime) {
  24. /**
  25. * 记录节点是否超时
  26. * 注意:当取消发生在节点被唤醒之前才返回true
  27. */
  28. timedout = transferAfterCancelledWait(node);
  29. //转换一定会成功,因此安心退出即可
  30. break;
  31. }
  32. //若超过abstime时长,会自动解除阻塞唤醒线程
  33. LockSupport.parkUntil(this, abstime);
  34. /**
  35. * 检测中断,一旦发生中断
  36. * 1.将条件队列中因中断而唤醒的节点进行转移(注意此处是中断)
  37. * 2.退出循环 -> 接下来会在循环外进行中断处理
  38. * 注意:之所以安心退出是因为会通过执行transferAfterCancelledWait进行节点转移
  39. * 这样随后就能安心执行acquireQueued同步队列入队操作了
  40. */
  41. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  42. break;
  43. }
  44. /**
  45. * 重新尝试获取同步锁,获取成功后且被中断,当中断模式为抛出异常时,需要设置为重新中断
  46. * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
  47. */
  48. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  49. interruptMode = REINTERRUPT;
  50. /**
  51. * 若当前节点存在后继节点时,需要执行出队操作
  52. * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
  53. */
  54. if (node.nextWaiter != null)
  55. unlinkCancelledWaiters();
  56. //interruptMode != 0 说明是需要进行中断处理的
  57. if (interruptMode != 0)
  58. //执行中断处理
  59. reportInterruptAfterWait(interruptMode);
  60. return !timedout;
  61. }

6.条件节点唤醒

唤醒操作主要干了三个事情:

1.清除节点: 从条件队列中移除该节点

2.节点转移: 将条件节点转换为同步节点,即从条件队列转移到同步队列

3.唤醒节点: 将转移成功的节点重新唤醒

6.1 唤醒单个

唤醒单个的实质:根据FIFO原则唤醒first,即条件队列头节点

  1. /**
  2. * Moves the longest-waiting thread, if one exists, from the
  3. * wait queue for this condition to the wait queue for the
  4. * owning lock.
  5. *
  6. * 从条件队列中唤醒一个节点
  7. * 原则:将条件队列中已存在的等待时间最长的线程转移到等待队列中,即头节点
  8. */
  9. public final void signal() {
  10. //条件队列只适用于独占模式且只能由持有锁的线程执行唤醒操作
  11. if (!isHeldExclusively())
  12. throw new IllegalMonitorStateException();
  13. //先进先出原则
  14. Node first = firstWaiter;
  15. if (first != null)
  16. doSignal(first);
  17. }
  18. /**
  19. * Removes and transfers nodes until hit non-cancelled one or null.
  20. * @param first (non-null) the first node on condition queue 条件队列非空头节点
  21. */
  22. private void doSignal(Node first) {
  23. do {
  24. //清空队列
  25. if ( (firstWaiter = first.nextWaiter) == null)
  26. lastWaiter = null;
  27. first.nextWaiter = null;
  28. /**
  29. * 将节点从条件队列转换到同步队列中并唤醒线程
  30. */
  31. } while (!transferForSignal(first) && (first = firstWaiter) != null);
  32. }

6.2 唤醒全部

唤醒全部的实质:沿着nextWaiter顺序遍历依次唤醒

  1. /**
  2. * Moves all threads from the wait queue for this condition to
  3. * the wait queue for the owning lock.
  4. *
  5. * 唤醒条件队列中的全部节点
  6. */
  7. public final void signalAll() {
  8. if (!isHeldExclusively())
  9. throw new IllegalMonitorStateException();
  10. Node first = firstWaiter;
  11. if (first != null)
  12. //跟signal的区别就是多了个All...
  13. doSignalAll(first);
  14. }
  15. /**
  16. * Removes and transfers all nodes.
  17. *
  18. * 全部唤醒的实质:
  19. * 沿着nextWaiter顺序遍历依次转移并唤醒
  20. */
  21. private void doSignalAll(Node first) {
  22. //1.注意要清空条件队列
  23. lastWaiter = firstWaiter = null;
  24. //2.沿着nextWaiter顺序遍历依次唤醒
  25. do {
  26. Node next = first.nextWaiter;
  27. first.nextWaiter = null;
  28. transferForSignal(first);
  29. //3.其实质就是沿着nextWaiter顺序遍历
  30. first = next;
  31. } while (first != null);
  32. }

7.条件节点转移

转移操作指的是节点由条件节点转换为同步节点,主要进行了如下两步进行转移:
1.更新节点状态: 根据同步队列入队原则,新节点初始状态必须为0
2.同步队列入队: 当前节点肯定能入队成功,同时返回前驱节点

7.1 因正常唤醒转移

  1. /**
  2. * Transfers a node from a condition queue onto sync queue.
  3. * Returns true if successful.
  4. *
  5. * 将节点从条件队列转换到同步队列中并唤醒线程
  6. * 1.在正常调用signalXXX()方法时才会调用该方法
  7. * 2.若因为超时或中断进行转移,不会调用该方法,但这两种情况的转移都会置为0
  8. * 3.注意:在调用该方法之前都会执行first.nextWaiter = null,即从等待队列中移除当前头节点
  9. *
  10. * @param node the node 根据FIFO原则,通常为first节点
  11. * @return true if successfully transferred (else the node was
  12. * cancelled before signal) 转换成功或节点在唤醒前被取消 才返回true
  13. */
  14. final boolean transferForSignal(Node node) {
  15. /**
  16. * 1.CAS更新节点状态为0,一旦失败立即返回false
  17. * 注意:必须将节点状态更新为0,因为同步队列入队时新节点必须为0,
  18. * 否则就不符合同步队列入队原则,因此一旦失败立即返回
  19. * 补充:CAS失败的原因在于其他线程已经执行唤醒操作将该节点变更为0,因此其实无需再次enq了
  20. */
  21. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  22. return false;
  23. //2.进入同步队列,注意enq方法返回的是前驱节点
  24. Node p = enq(node);
  25. /**
  26. * 3.在因前驱节点被取消或状态突变发生后需要唤醒节点线程
  27. * ws > 0:
  28. * 说明前驱节点被取消(CANCELLED),因此需要唤醒当前节点线程
  29. * !compareAndSetWaitStatus(p, ws, Node.SIGNAL):
  30. * 此时发生状态突变,比如ws刚好变成CANCELLED
  31. * 补充:因为enq保证一定入队成功,因此实质是唤醒在同步队列中的节点
  32. * 结合awaitXXX(),线程会从park之后继续往后执行
  33. *
  34. * !!特别注意!!:正常情况下,根据流程可知
  35. * ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
  36. * 这种判断是不会返回true的,此时是不会唤线程;
  37. * 因此真正唤醒线程的地方在于调用signal()方法的线程发送完signal
  38. * 信号后再调用release(1)方法(比如调用ReentrantLock的unlock()),
  39. * 因为其已入队,因此可以被唤醒
  40. */
  41. int ws = p.waitStatus.
  42. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  43. LockSupport.unpark(node.thread);
  44. return true;
  45. }

小问:读者还记得enq方法都做了什么吗?
友情小提示:读者可以回顾一下并发番@AbstractQueuedSynchronizer一文通

小答:针对enq方法,笔者简单回顾一下作用,帮助读者理解一下:

1.初始化时对head和tail进行赋值

2.当前节点进入同步对了并作为新的tail

3.入队一定可以成功因为自旋

4.注意enq返回的是前驱节点


7.2 因中断或超时唤醒转移

  1. /**
  2. * Transfers node, if necessary, to sync queue after a cancelled wait.
  3. * Returns true if thread was cancelled before being signalled.
  4. *
  5. * 将条件队列中因中断或超时而唤醒的节点进行转移
  6. * 取消发生在被唤醒之前返回true
  7. * 两种情况需要调用该方法:
  8. * 1.中断:checkInterruptWhileWaiting() -> 该方法会被awaitXXX()调用
  9. * 2.超时:带超时的awaitXXX()方法
  10. */
  11. final boolean transferAfterCancelledWait(Node node) {
  12. //Node.CONDITION状态节点只用于条件队列,因此需要设置为0才能进入同步队列
  13. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  14. //当CAS变更waitStatus为0时,说明节点已跟条件队列无关,随后进入同步队列即可
  15. enq(node);
  16. //由于enq方法能够保证进入同步队列成功,因此当enq执行完毕,可以放心的返回true
  17. return true;
  18. }
  19. //若CAS失败且节点还不在同步队列中,线程需要先释放资源,提高效率
  20. while (!isOnSyncQueue(node))
  21. Thread.yield();
  22. return false;
  23. }

小问:为什么需要执行Thread.yield()?
友情小提示:读者可以从效率优化角度考虑

小答:对于这个问题,笔者的考虑如下:
1.若CAS变更为0失败,说明已被其他线程执行唤醒转换并变更为0
2.但由于enq操作可能还在进行中,因此此时该节点可能还没真正入队,因此需要循环检测是否已入队
3.由于入队操作已经在执行中,因此无需重复执行enq操作,可以先释放资源,并通过isOnSyncQueue退出循环即可,优化效率

8.条件节点队列操作

8.1 条件节点入队

新节点入队遵循FIFO原则,主要会做如下操作:

1.清洗: 若尾节点非CONDITION状态,需要清除所有非CONDITION状态节点并重设头尾节点

2.封装: 封装当前线程为Node,同时初始化节点状态为CONDITION

3.入队: 队列为空时,头尾节点指向同一个元素;非空时通过nextWaiter形成链表

  1. /**
  2. * Adds a new waiter to wait queue.
  3. *
  4. * 新增条件等待队列队尾节点 - FIFO
  5. * 补充:由于条件队列只适用于独占模式,因此该方法不会有并发问题
  6. * @return its new wait node 返回新节点
  7. */
  8. private Node addConditionWaiter() {
  9. //FIFO原则:新入队元素需队尾插入
  10. Node t = lastWaiter;
  11. //1.若尾节点非Condition状态,需要清除所有非CONDITION状态节点并重设头尾节点
  12. if (t != null && t.waitStatus != Node.CONDITION) {
  13. /**
  14. * 删除条件等待队列中的所有非CONDITION状态节点
  15. * 补充:由于在条件队列中节点状态只能是CONDITION或0(signal|timeout|interrupt)
  16. * 因此该方法实质就清除空节点或状态为0节点
  17. */
  18. unlinkCancelledWaiters();
  19. //上个方法执行后会使得lastWaiter肯定为非取消状态节点(只可能为空或CONDITION状态)
  20. t = lastWaiter;
  21. }
  22. //2.封装当前线程为Node,同时初始化节点状态为CONDITION
  23. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  24. //3.队尾元素为空,正说明条件队列为空
  25. if (t == null)
  26. firstWaiter = node;
  27. else
  28. //4.新元素入队需队尾插入
  29. t.nextWaiter = node;
  30. //5.这里需要注意的是:队列为空时,头尾节点是指向同一个元素的;非空时就会形成链表
  31. lastWaiter = node;
  32. return node;
  33. }

小问:何时出现t != null && t.waitStatus != Node.CONDITION?
友情小提示:读者可以考虑一下条件节点状态的变更时机

小答:尾节点的状态不为Node.CONDITION,那只可能为0,这意味着节点需要转移,那么节点状态变更时机都有哪些呢?

1.当因中断或超时被唤醒后会通过调用transferAfterCancelledWait将节点状态CAS为0

2.当被正常唤醒后会通过调用transferForSignal将节点状态CAS为0

分析:但问题是正常唤醒后会先执行first.nextWaiter = null,因此此时尾节点应为空

结论:排除这种情况后可知,只有第一种情况,即因中断或超时被唤醒后才会出现这种情况

那么问题来了,什么时候节点才真正出队呢?聪明的读者可以先思考一下~


8.2 条件节点出队

条件节点的出队时机:

1.新节点入队: 通过判断t.waitStatus != Node.CONDITION为true时执行unlinkCancelledWaiters()

2.节点重新尝试获取同步锁后: 通过判断node.nextWaiter != null为true时执行unlinkCancelledWaiters()

3.节点被正常唤醒后: 通过调用signal()方法并执行first.nextWaiter = null

补充:3与1,2的区别在于前者只是清除头节点,后者是遍历清除所有非CONDITION状态节点
注意:无论入队还是出队,前提都是线程已经持有同步锁

  1. /**
  2. * Unlinks cancelled waiter nodes from condition queue.
  3. *
  4. * 移除条件队列中的所有非CONDITION状态节点
  5. * 两种情况会触发该方法:
  6. * 1.新节点加入条件队列时 -> addConditionWaiter()
  7. * 2.节点被唤醒之后 -> awaitXXX()
  8. * 补充一点:注意不是在signal()方法中执行的(因为后者只有转移和唤醒操作,前者才有获取锁操作)
  9. * 注意:条件队列中的节点只有CONDITION和0两种状态
  10. */
  11. private void unlinkCancelledWaiters() {
  12. Node t = firstWaiter;
  13. //临时节点,主要用于在遍历时记录上一个非CONDITION节点(因为要跳过所有非CONDITION节点)
  14. Node trail = null;
  15. //从前往后顺序遍历条件队列,剔除全部非CONDITION状态节点
  16. while (t != null) {
  17. //下一个节点,注意next可能为空
  18. Node next = t.nextWaiter;
  19. /**
  20. * 非CONDITION状态(即0),需要执行移除操作
  21. * 强调:在条件队列中节点的waitStatus,只可能是CONDITION或是0(signal|timeout|interrupt)
  22. */
  23. if (t.waitStatus != Node.CONDITION) {
  24. /**
  25. * 移除后继节点 等同于 将当前节点从队列中移除
  26. * 同时由于不再有引用,会help GC
  27. * 注意:GC同时会考虑firstWaiter和lastWaiter的引用情况
  28. * 即若当前线程无用,上述变量最终也会移除对该节点的引用
  29. */
  30. t.nextWaiter = null;
  31. /**
  32. * 若之前没有非CONDITION状态节点,就先让next当一下头头
  33. * 注意:该方法执行完毕后,firstWaiter只能为空或CONDITION
  34. */
  35. if (trail == null)
  36. firstWaiter = next;
  37. else
  38. /**
  39. * 一旦当前节点非CONDITION状态,那么需要先将它的后继节点与上一个非取消节点建立起联系
  40. * 即使后继节点是非CONDITION状态也没事,因为在下次遍历时会重新建立联系的
  41. * 说白了,其本质就是删除t
  42. */
  43. trail.nextWaiter = next;
  44. /**
  45. * 最后一个非CONDITION状态节点即是条件队列的尾节点
  46. * 注意:该方法执行完毕后,lastWaiter最终只能为空或CONDITION
  47. */
  48. if (next == null)
  49. lastWaiter = trail;
  50. }
  51. else
  52. //CONDITION状态,就记录一下以作为下次遍历时的上一个非CONDITION节点
  53. trail = t;
  54. //开启下一次循环
  55. t = next;
  56. }
  57. }

小问:为神马调用unlinkCancelledWaitersfirstWaiter只能为空或CONDITION?
友情小提示:一时间想不清楚的话可以使用分类讨论法

小答:头节点非空时,有且只有条件队列中全部都是非CONDITION状态的节点时,新的头节点才能为空,原因在于firstWaiter = next;的前提是无可用的CONDITION状态节点,而trail只有在有CONDITION状态节点时才会被赋值更新


9.重要方法

9.1 释放同步锁

  1. /**
  2. * Invokes release with current state value; returns saved state.
  3. * Cancels node and throws exception on failure.
  4. *
  5. * 释放同步状态并返回原状态
  6. * 当释放失败时节点会被取消同时抛出IllegalMonitorStateException异常
  7. */
  8. final int fullyRelease(Node node) {
  9. boolean failed = true;
  10. try {
  11. //获取当前线程持有锁的同步状态 -- 支持可重入
  12. int savedState = getState();
  13. /**
  14. * 由于该方法只用于独占模式,因此使用的是独占独有的release方法
  15. * 关于release方法请参见笔者的并发番@AbstractQueuedSynchronizer一文通
  16. * 作用是更新state状态(为0)同时唤醒后继节点(如果存在的话)
  17. * 释放锁的目的是为了让其他线程能够获取锁去执行任务,
  18. * 并等到其他线程调用signal()和release()后能够唤醒该线程
  19. */
  20. if (release(savedState)) {
  21. failed = false;
  22. return savedState;
  23. } else {
  24. //一旦释放失败,直接抛出异常 -- 干脆的很
  25. throw new IllegalMonitorStateException();
  26. }
  27. } finally {
  28. /**
  29. * 释放失败的补救措施:
  30. * 由于实际上节点已完成使命,节点状态需要变成取消状态以用于跳过和回收
  31. * 注意:此时节点还是同步节点,因此需要设置为CANCELLED
  32. */
  33. if (failed)
  34. node.waitStatus = Node.CANCELLED;
  35. }
  36. }

9.2 检测中断

  1. /**
  2. * Checks for interrupt, returning THROW_IE if interrupted
  3. * before signalled, REINTERRUPT if after signalled, or
  4. * 0 if not interrupted.
  5. *
  6. * 检测线程的中断情况,通过返回状态码告知线程下一步应如何处理中断,情况有如下三种:
  7. * 1.中断发生在被唤醒之前返回THROW_IE(需要抛出异常)
  8. * 2.中断发生在被唤醒之后返回REINTERRUPT(需要重新中断)
  9. * 3.无中断返回0
  10. */
  11. private int checkInterruptWhileWaiting(Node node) {
  12. return Thread.interrupted() ? (transferAfterCancelledWait(node) ?
  13. THROW_IE : REINTERRUPT) : 0
  14. }

小问:为神马中断发生在被唤醒之前返回THROW_IE?之后返回REINTERRUPT?
友情小提示:读者可以回顾同步队列获取锁时的处理逻辑,作用相同

小答:唤醒前抛出异常主要是为了快速失败,提高效率,而重新中断是要处理被唤醒后一系列

唤醒前: 若被唤醒之前线程被中断,说明线程此时没有获取到资源,尽快抛出异常就可以结束等待并解放生产力;

唤醒后: 若被唤醒后线程被中断,说明线程基本已经获取锁,这时可能要多执行一些操作,如释放锁等


9.3 判断节点是否位于同步队列

  1. /**
  2. * Returns true if a node, always one that was initially placed on
  3. * a condition queue, is now waiting to reacquire on sync queue.
  4. *
  5. * 判断节点是否位于同步队列
  6. * @param node the node
  7. * @return true if is reacquiring
  8. */
  9. final boolean isOnSyncQueue(Node node) {
  10. //CONDITION只用于条件队列 || prev为空说明其一定不在同步队列中
  11. if (node.waitStatus == Node.CONDITION || node.prev == null)
  12. return false;
  13. //如果next非空,它肯定在同步队列中
  14. if (node.next != null)
  15. return true;
  16. //node.prev非空并不意味着在同步队列中,因此需要从后往前遍历以判断是否在同步队列中
  17. return findNodeFromTail(node);
  18. }
  1. /**
  2. * Returns true if node is on sync queue by searching backwards from tail.
  3. *
  4. * node.prev非空并不意味着在同步队列中,原因是CAS(放入同步队列)可能失败
  5. * 因此需要从后往前遍历以判断其是否在同步队列中
  6. * 由于该方法被调用时,该节点总是靠近tail,因此除非CAS失败(可能性很低),否则几乎无须遍历
  7. */
  8. private boolean findNodeFromTail(Node node) {
  9. //从同步队列的队尾节点开始沿着prev依次往前遍历
  10. Node t = tail;
  11. for (;;) {
  12. //存在,返回true
  13. if (t == node)
  14. return true;
  15. //都遍历到头了还不存在,那只能返回false了
  16. if (t == null)
  17. return false;
  18. //沿着prev依次往前遍历吧
  19. t = t.prev;
  20. }
  21. }

小问: 为神马prev为空不在同步队列?prev非空不意味着在同步队列?next非空肯定在同步队列?
友情小提示:迷惑的小伙伴可以看看笔者的并发番@AbstractQueuedSynchronizer一文通


10.流程图

10.1 无响应中断

注意:仅当调用signal()的线程再调用release()方法之后才会真正解除被阻塞线程的阻塞状态,但release()之后该线程并不是被立即唤醒,而是重新竞争锁直到变成head节点的后继节点且head节点为SIGNAl状态时才能被真正唤醒

条件队列.jpg-523.3kB

10.2 响应中断

中断的条件队列.jpg-821.2kB

10.3 响应中断与超时

超时中断的条件队列.jpg-1071.5kB


并发番@ConditionObject一文通黄志鹏kira 创作,采用 知识共享 署名-非商业性使用 4.0 国际 许可协议 进行许可。

本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注