[关闭]
@kiraSally 2021-05-26T15:42:03.000000Z 字数 33534 阅读 3663

并发番@AbstractQueuedSynchronizer一文通

并发 1.8版

AQS概述

AQS综述

AQS(队列同步器)是一个用来构建锁和同步器的框架,Doug Lea期望其作为大部分同步需求的基础:

作用: AQS是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义

功能: AQS框架提供实现阻塞锁和依赖FIFO等待队列的关联同步器

按需调用: AQS并不会实现任何同步接口,相反仅是提供一些方法以便具体锁和关联同步器按需调用

适用: 该类适用于依赖单个原子int变量表示同步状态的多种形式的同步器

原理: 内部使用一个volatile int变量表示同步状态,通过FIFO同步队列实现资源获取线程的排队工作,通过UnSafe实现底层的等待与唤醒操作,通过ConditionObject实现条件变量的使用

AQS组件

整个AQS由4个核心组件构成:

同步状态:「volatile int State」变量,该状态其实质就是可用资源数(因此是数值而不是布尔值)

Node节点: 队列操作的基本元素,线程入队前会先被封装成Node节点,其会记录队列操作所需的重要属性

同步队列: FIFO等待队列,CLH锁的变种实现并同时支持独占和共享模式,当线程获取锁失败会进入同步队列中等待,成功获取锁或因中断、异常等原因获取锁失败时出队

条件队列: 只用于独占模式, 且使用Condition的前提是线程已经获取到锁,并发番@ConditionObject一文通

AQS核心功能

加锁


AQS中加锁的基本流程如下(以独占模式为例):

- 论文版:

  1. //循环判断同步状态是否可取
  2. while (synchronization state does not allow acquire) {
  3. //不可取时,线程入同步队列(若尚未进入同步队列)
  4. enqueue current thread if not already queued;
  5. //阻塞当前线程
  6. possibly block current thread;
  7. }
  8. //成功获取锁后出队(当然异常失败也需要出队)
  9. dequeue current thread if it was queued;

- 实现版:

  1. //1.tryAcquire会CAS更新State,更新成功获取锁,否则进入同步队列以自旋方式获取锁
  2. if(!tryAcquire(arg)){
  3. //自旋方式获取锁
  4. for (;;) {
  5. //2.若当前节点的前驱节点为head, 则再次尝试获取锁
  6. if(node.prev == head && tryAcquire(arg)){
  7. //3.获取锁后重设head,共享模式下需要传播唤醒后继节点
  8. setHead(node);
  9. }
  10. //4.获取锁失败,则安全更新前驱节点的waitStatus的值为SINGAL并对当前节点的线程进行阻塞
  11. CAS(node,waitStatus,SINGAL) && park(node)
  12. }
  13. }

此时我们可以先关注四个点:state、队列头节点、节点的waitSatus、阻塞当前线程

解锁

AQS中解锁的基本流程如下(以独占模式为例):

- 论文版:

  1. //更新同步状态
  2. update synchronization state;
  3. //判断同步状态是否可取
  4. if (state may permit a blocked thread to acquire)
  5. //同步状态可取则从同步队列中释放阻塞线程
  6. unblock one or more queued threads;

- 实现版:

  1. //1.tryRelease会CAS更新State为0,成功返回true
  2. if(tryRelease(arg)){
  3. //2.若是SIGNAL,则将waitStatus回归为0同时唤醒后继节点
  4. if (node.waitStatus != 0)
  5. CAS(node,waitStatus,0) && unpark(node.next);
  6. }

此时我们可以先关注四个点:state、出队、节点的waitStatus、唤醒后继线程

独占与共享

AQS同时支持两种模式,分别是独占模式和共享模式:

独占模式: 即只有一个线程能持有锁(单资源,排他性),AQS提供「acquire」「release」方法

共享模式: 即有多个线程能持有锁(多资源),AQS提供「acquireShared」「releaseShared」方法

中断与超时

AQS同时新增对中断和超时的响应支持,同时也区分独占模式和共享模式:

独占模式: 即只有一个线程能持有锁(单资源),AQS提供「acquireInterruptibly」「tryAcquireNanos」方法

共享模式: 即有多个线程能持有锁(多资源),AQS提供「acquireSharedInterruptibly」「tryAcquireSharedNanos」方法

AQS使用

自定义同步器只需实现「State」的获取和释放即可,即如下模板方法,状态维护和队列管理等已由AQS实现:

实现独占: 「tryAcquire」「tryRelease」「isHeldExclusively」

实现共享: 「tryAcquireShared」「tryReleaseShared」

补充: 在使用时使用者只需要实现独占和共享的其中一种即可(如「ReentrantLock」),当然也支持两者都实现(如「ReentrantReadWriteLock」)

AQS构成

类定义

  1. public abstract class AbstractQueuedSynchronizer
  2. extends AbstractOwnableSynchronizer
  3. implements java.io.Serializable

构造器

  1. protected AbstractQueuedSynchronizer() { }

重要变量

  1. /**
  2. * CLH锁同步队列的头节点,延迟初始化(懒加载)
  3. * 除了初始化,其他时刻只能被setHead进行更新
  4. * 注意:当head存在时,waitStatus不能是CANCELLED状态
  5. */
  6. private transient volatile Node head;
  7. /**
  8. * CLH锁同步队列的尾节点,延迟初始化(懒加载)
  9. * 只能在入队时新增一个Node时进行更新
  10. */
  11. private transient volatile Node tail;
  12. /**
  13. * 同步状态 - volatile保证其可见性
  14. */
  15. private volatile int state;

内部类

AQS内包含两个内部类,分别是:

Node: 每个线程会被封装成一个Node节点,其中会记录线程、节点状态和前后节点等信息,详情请参见3.Node节点

ConditionObject: 条件变量,用于实现管程形式的条件控制,详情请参见并发番@AbstractQueuedSynchronized一文通

UnSafe

作用: 用于提供CAS原子更新操作

  1. private static final Unsafe unsafe = Unsafe.getUnsafe();
  2. private static final long stateOffset;
  3. private static final long headOffset;
  4. private static final long tailOffset;
  5. private static final long waitStatusOffset;
  6. private static final long nextOffset;
  7. static {
  8. try {
  9. stateOffset = unsafe.objectFieldOffset
  10. (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  11. headOffset = unsafe.objectFieldOffset
  12. (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  13. tailOffset = unsafe.objectFieldOffset
  14. (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  15. waitStatusOffset = unsafe.objectFieldOffset
  16. (Node.class.getDeclaredField("waitStatus"));
  17. nextOffset = unsafe.objectFieldOffset
  18. (Node.class.getDeclaredField("next"));
  19. } catch (Exception ex) { throw new Error(ex); }
  20. }
  21. /**
  22. * CAS原子更新head节点,仅用于同步队列的enq入队操作
  23. */
  24. private final boolean compareAndSetHead(Node update) {
  25. return unsafe.compareAndSwapObject(this, headOffset, null, update);
  26. }
  27. /**
  28. * CAS原子更新tail节点,仅用于同步队列的enq入队操作
  29. */
  30. private final boolean compareAndSetTail(Node expect, Node update) {
  31. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  32. }
  33. /**
  34. * CAS更新原子更新Node节点的waitStatus变量
  35. */
  36. private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
  37. return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
  38. }
  39. /**
  40. * CAS next field of a node.原子更新Node节点的next变量,仅用于cancelAcquire取消获取操作
  41. */
  42. private static final boolean compareAndSetNext(Node node,Node expect,Node update) {
  43. return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
  44. }

Node节点

节点概述

待入队线程会在入队前被封装成一个Node节点,其中会记录线程、节点状态和前后节点等信息:

作用: AQS框架的变种CLH锁借由Node组成的「FIFO双向链表队列」实现

链接: 每个Node通过「pred」链接其前驱节点,通过「next」链接其后继节点

条件支持: 每个Node同时会通过「nextWaiter」提供对「Condition」的支持

模式: 每个Node都可以支持独占「EXCLUSIVE」共享「SHARED」模式

初始化: CLH锁只有在第一次入队时(即第一次出现竞争时)会初始化「Head」「Tail」,主要是性能考究(默认少竞争)

节点类

  1. static final class Node {
  2. /** 标记共享模式 */
  3. static final Node SHARED = new Node();
  4. /** 标记独占模式 */
  5. static final Node EXCLUSIVE = null;
  6. /** 标记节点被取消 */
  7. static final int CANCELLED = 1;
  8. /** 标记后继节点需要被唤醒 */
  9. static final int SIGNAL = -1;
  10. /** 标记节点位于条件阻塞队列中 */
  11. static final int CONDITION = -2;
  12. /**
  13. * 标记共享模式下,节点共享状态正在被传播(acquireShared)
  14. * 当前节点获得锁或释放锁时, 共享模式下节点的最终状态是 PROPAGATE
  15. */
  16. static final int PROPAGATE = -3;
  17. /**
  18. * !!!重中之重!!!
  19. * 标记节点状态,默认为0,负数无须唤醒,使用CAS原子更新
  20. * 独占模式:SIGNAL、CANCEL、0
  21. * 共享模式:SIGNAL、CANCEL、PROPAGATE、0
  22. * 条件变量:CONDITION状态不会存在于CLH锁同步队列中,只用于条件阻塞队列
  23. */
  24. volatile int waitStatus;
  25. /**
  26. * 在CLH锁同步队列中链接前驱节点,使用CAS原子更新,每次入队和GC出队时会被指派
  27. * 当前驱节点被取消时,一定能找到一个未被取消的节点,因为Head节点永远不会被取消:头节点必须成功aquire
  28. * 被取消的线程不会再次成功aquire,线程只能取消自己不会影响其他
  29. * 主要作用是在循环中跳过CANCELLED状态的节点
  30. */
  31. volatile Node prev;
  32. /**
  33. * 在CLH锁同步队列中链接后继节点,每次入队、前驱节点被取消以及GC出队时被指派
  34. * 赋值操作非线程安全,next为null时并不意味着节点不存在后继节点
  35. * 当next不为null时,next是可靠的
  36. * 主要作用是在释放锁时对后继节点进行唤醒
  37. */
  38. volatile Node next;
  39. /** Node关联线程 */
  40. volatile Thread thread;
  41. /**
  42. * 链接位于条件阻塞队列的节点或特定SHARED值
  43. * 实际作用就是标记Node是共享模式还是独占模式
  44. * 独占模式时为null,共享模式时为SHARED
  45. * 在条件阻塞队列中指向下一个节点
  46. */
  47. Node nextWaiter;
  48. /**
  49. * 判断Node是否为共享模式
  50. * @Return true 是 false 不是
  51. */
  52. final boolean isShared() {
  53. //当是共享模式时,nextWaiter就是SHARED值,独占模式就是null
  54. return nextWaiter == SHARED;
  55. }
  56. /**
  57. * 返回前驱节点,当前驱节点为空时直接抛空指针异常(实际上Head永远不会为null)
  58. */
  59. final Node predecessor() throws NullPointerException {
  60. Node p = prev;
  61. //空指针判断只要是为了help gc
  62. if (p == null)
  63. throw new NullPointerException();
  64. else
  65. return p;
  66. }
  67. //默认共享模式
  68. Node() {}
  69. // Used by addWaiter 用于CLH锁同步队列
  70. Node(Thread thread, Node mode) {
  71. this.nextWaiter = mode;
  72. this.thread = thread;
  73. }
  74. // Used by Condition 用于条件阻塞队列
  75. Node(Thread thread, int waitStatus) {
  76. this.waitStatus = waitStatus;
  77. this.thread = thread;
  78. }
  79. }

节点状态

每个Node都有持有对应的「线程ID」,并通过int类型的「waitStatus」标记节点状态

SIGNAL(-1): 标记唤醒,当前节点被释放后必须唤醒后继节点

CANCELLED(1): 标记已取消,当Node因超时或中断被取消,取消状态不可变且对应线程不可再次阻塞

CONDITION(-2): 标记条件阻塞,即Node位于条件变量的阻塞队列中(或者说是条件阻塞队列)

PROPAGATE(-3): 标记传递中,仅用于标记位于同步队列的头节点,表示共享状态该正在被传递中

0: 默认为0,当为条件阻塞时默认为-2,非负数意味着无须唤醒,此值使用CAS原子更新

状态维护

「State」状态主要用于记录线程获取锁的次数,其实质就是可用资源数,所有操作的目的其实都是为了获得她的青睐

1.必须使用CAS对「State」状态进行原子更新

2.当「State」状态>0时,说明当前线程已持有该锁;当「State」状态=0时,说明当前线程无该锁

3.由于State可自增,因此可用于实现可重入,如「ReentrantLock.lock()」

4.独占模式中「State」的值最多为1 ,共享模式中「State」的值可以任意大(如「CountDownLatch」)

  1. /**
  2. * 同步状态
  3. */
  4. private volatile int state;
  5. /**
  6. * 返回当前同步状态,该操作具有volatile内存读语义 - 直接从主内存读取到最新值
  7. */
  8. protected final int getState() {
  9. return state;
  10. }
  11. /**
  12. * 设置当前同步状态,该操作具有volatile内存写语义 - 一旦变更直接刷新到主内存
  13. */
  14. protected final void setState(int newState) {
  15. state = newState;
  16. }
  17. /**
  18. * CAS更新当前同步状态,该操作同时具备volatile内存读-写语义
  19. * @param expect 期望值
  20. * @param update 新值
  21. * @return {@code true} 成功返回true,失败返回false(即实际值!=期望值)
  22. */
  23. protected final boolean compareAndSetState(int expect, int update) {
  24. // 底层调用通过Unsafe调用CAS
  25. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  26. }

同步队列

同步队列入队

同步队列入队主要涉及4个操作:

1.封装Node: 将当前线程封装成Node同时指定独占或共享模式

2.初始化: 当未初始化时,初始化头节点head和尾节点tail

3.队尾追加: 根据FIFO原则,新增节点会被追加到队尾

4.CAS更新tail: 通常将新增节点作为新的tail


小问:如何保证新增节点一定入队并且tail设置成功?
友情小提示:读者可以思考CAS结合重试的解决方案

小答:由于入队很可能发生并发竞争,为了处理这种情况,Doug Lea老师使用了经典的并发解决方案-
自旋CAS变更volatile变量:通过反复重试结合CAS的方式保证至少有一次能入队成功,一次不行就再来一次

  1. /**
  2. * 新增一个等待者
  3. * - 创建和入队一个新node
  4. * - 根据FIFO,新增的追加到队尾并被设置为tail
  5. *
  6. * @param mode Node.EXCLUSIVE for exclusive 独占, Node.SHARED for shared 共享
  7. * @return the new node 返回新Node
  8. */
  9. private Node addWaiter(Node mode) {
  10. //1.当前线程封装成Node,并指定独占或共享模式
  11. Node node = new Node(Thread.currentThread(), mode);
  12. //记录原tail -> 根据FIFO,新增的追加到队尾并被设置为tail
  13. Node pred = tail;
  14. /**
  15. * 当pred为null时,说明还未初始化,应该直接走enq方法完成初始化
  16. * if代码块是快速入队的一个优化:仅当CAS操作失败才会进入enq方法进行自旋
  17. */
  18. if (pred != null) {
  19. node.prev = pred;
  20. /**
  21. * CAS更新tail -> 将新增节点设置为tail
  22. * 允许CAS失败,如果失败直接进入enq,采用自旋方式入队
  23. */
  24. if (compareAndSetTail(pred, node)) {
  25. //将原tail节点的next链接指向当前节点
  26. pred.next = node;
  27. return node;
  28. }
  29. }
  30. //采用自旋方式入队
  31. enq(node);
  32. return node;
  33. }
  34. /**
  35. * 向队尾插入一个节点,未初始化(tail为null)时完成初始化
  36. * 必须CAS更新Head和Tail
  37. * @param node the node to insert 待插入节点
  38. * @return node's predecessor 待插入节点的前驱节点(即原tail)
  39. */
  40. private Node enq(final Node node) {
  41. //自旋方式入队
  42. for (;;) {
  43. Node t = tail;
  44. /**
  45. * 2.初始化 -> 当tail为null时就意味着队列为空
  46. * - 初始化时会同时生成head和tail
  47. * - 其中head一直作为dummy节点存在,不需要存储thread,
  48. * 但需要记录waitStatus字段,以作为唤醒后继节点的依据
  49. * - 其中tail一直作为实体节点存在,会存储thread
  50. */
  51. if (t == null) {
  52. if (compareAndSetHead(new Node()))
  53. tail = head;
  54. } else {
  55. /**
  56. * 3.新节点追加到队尾,关于prev有三个重要论点:
  57. * 一.Node使用prev(前驱节点)作为形成链的根本依据
  58. * 二.当节点位于同步队列中,prev一定非空
  59. * 三.但prev非空并不意味着节点位于同步队列中
  60. * 因为发生竞争时CAS更新tail是允许失败的,一旦CAS失败就再自旋一次
  61. * 当CAS更新tail失败时,由于节点只是将prev指向tail但并没有设置tail成功
  62. * 此时并不能算作真正的入队(原因在于后面获取和释放操作都是基于tail的)
  63. */
  64. node.prev = t;
  65. /**
  66. * 4.CAS更新tail,关于next有三个重要论点:
  67. * 一.Node使用next作为形成链的一种优化辅助手段
  68. * 二.当next非空,节点一定存在同步队列中
  69. * 三.但节点存在同步队列时,next不一定非空
  70. */
  71. if (compareAndSetTail(t, node)) {
  72. t.next = node;
  73. return t;
  74. }
  75. }
  76. }
  77. }

小问:为神马说next是非线程安全的?有什么隐患吗?
友情小提示:读者可以从tail的更新时机角度考虑

小答:有心的读者会发现在执行CAS更新tail成功之后才会执行t.next=node,此时节点已经真正入队;
但问题是在并发情况下,由于t.next=node非CAS操作,因此是非线程安全的;
但由于后续操作是依赖于tail的,next更多是个优化,因此即使非安全也没关系

小问:为神马要加入一个 dummy 节点(head)?
友情小提示:读者可以从CLH锁变种考虑

小答:原因是同步队列是CLH锁的一个变种
1.线程节点能否获取锁的判断就是通过其前继节点的状态,当前节点若想获取锁需要给前驱节点设置为SIGNAL状态,作用是当前驱节点释放锁后能通知其后继节点去获取锁
2.head节点用来表示当前已获得锁的节点,其无须存储线程,它的核心功能是作为必然存在的前驱节点通过记录waitStatus状态作为是否需要唤醒后继节点的判断依据

同步队列出队

独占模式成功获取锁后出队

独占模式成功获取锁出队会做三个操作:

1.设置Head: Head指向成功获取锁(即待出队)的节点,作为新的头节点

2.清空当前节点引用: 待出队的节点需要将thread和prev属性设置为null,help gc

3.清空前驱节点next: 由于当前节点需要被GC,因此也要清除其前驱节点的next

  1. /**
  2. * Sets head of queue to be node, thus dequeuing. Called only by
  3. * acquire methods. Also nulls out unused fields for sake of GC
  4. * and to suppress unnecessary signals and traversals.
  5. *
  6. * @param node the node 成功获取锁的节点(即待出队节点)
  7. */
  8. private void setHead(Node node) {
  9. // 1.head指向待出队节点
  10. head = node;
  11. // 2.清空当前节点引用
  12. node.thread = null;
  13. node.prev = null;
  14. }
  15. // 3.清空前驱节点next
  16. p.next = null;

小问:为神马要清空节点引用??
友情小提示:读者可以从Node的内部变量的作用以及head作用在同步队列中的作用这方面去考虑

小答:Head在同步队列中的定位就是作为待出队节点的一个状态记录点,以作为唤醒后继节点的依据

1.其核心在于waitStatus的值(当为SIGNAL-1时才会去唤醒后继节点),因此Node的其他属性并不重要,Head更多是虚拟节点的存在,只存储waitStatus即可,以作为唤醒后继的依据

2.同时清空thread、prev、prev.next有利于GC回收待出队的节点,因为该节点在给head设置完waitStatus之后就完成了使命,线程可以出队了


共享模式成功获取锁后出队

共享模式成功获取锁出队会做两个操作:

1.设置Head: 设置头节点,同时检测在共享模式下是否有后继者等待获取锁

2.向后传播唤醒: 如果存在,则在满足(propagate > 0 或 节点状态为PROPAGATE)时传播唤醒

  1. /**
  2. * Sets head of queue, and checks if successor may be waiting
  3. * in shared mode, if so propagating if either propagate > 0 or
  4. * PROPAGATE status was set.
  5. *
  6. * 共享模式成功获取锁出队
  7. * - 设置头节点并继续向后传播
  8. * 1.设置头节点,同时检测在共享模式下是否有后继者等待获取锁
  9. * 2.如果存在,则在满足(propagate > 0 或 节点状态为PROPAGATE)时传播唤醒
  10. *
  11. * @param node the node
  12. * @param propagate the return value from a tryAcquireShared
  13. */
  14. private void setHeadAndPropagate(Node node, int propagate) {
  15. //设置头节点
  16. Node h = head; // Record old head for check below
  17. setHead(node);
  18. /**
  19. * 这里做了很多逻辑或判断,主要目的就是为了判断是否还有剩余资源以唤醒后续线程节点
  20. * 1.propagate > 0 :意味着还有剩余资源(state>0),共享时当然需要继续唤醒
  21. * 2.h == null : 头节点为空(这个比较诡异-we don't know, because it appears null)
  22. * 3.h.waitStatus <0 : 头节点状态为负数(尤其是PROPAGATE),说明需要唤醒后继节点
  23. *
  24. * 注意: 这些保守的检查,在竞争环境下获取/释放锁可能会导致不必要的多次唤醒
  25. */
  26. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  27. (h = head) == null || h.waitStatus < 0) {
  28. Node s = node.next;
  29. //注意:一旦遇到节点是独占模式,即使propagate>0也会停止往后传播啦
  30. if (s == null || s.isShared())
  31. // 唤醒后继结点
  32. doReleaseShared();
  33. }
  34. }

小问:何时会出现h==null的情况?
友情小提示:这个可能需要测试极端环境,如果有读者有想法的欢迎留言指导笔者一二


因中断/超时获取锁失败后出队

友情小提示:建议在看完aquire操作之后再回看此章

因中断/超时而放弃获取锁失败的出队会做四个操作:

1.清空线程引用:清空当前节点的线程引用,便于当前节点出队和GC

2.寻找非取消状态前驱节点:沿prev反向遍历直到找到一个非取消状态前驱节点

3.设置CANCELLED状态:核心步骤,失败的节点需要设置为取消状态

4.清除、重置、唤醒:见步骤4-8

  1. /**
  2. * Cancels an ongoing attempt to acquire.
  3. *
  4. * 清除因中断/超时而放弃获取lock的线程节点
  5. *
  6. * @param node the node
  7. */
  8. private void cancelAcquire(Node node) {
  9. // Ignore if node doesn't exist
  10. if (node == null)
  11. return;
  12. // 1.清空线程引用
  13. node.thread = null;
  14. // Skip cancelled predecessors 跳过所有取消状态前驱节点
  15. Node pred = node.prev;
  16. // 2.沿prev反向遍历直到找到一个非取消状态前驱节点,同时也顺便清除途中出现的取消状态节点
  17. while (pred.waitStatus > 0)
  18. node.prev = pred = pred.prev;
  19. // 注意predNext也是需要清除的
  20. Node predNext = pred.next;
  21. /**
  22. * 3. 节点状态需要设置为CANCELLED,即标记为可回收的无用节点
  23. * CANCELLED状态的节点之后除了被清除不会再参与任何操作,等同于"垃圾"了
  24. */
  25. node.waitStatus = Node.CANCELLED;
  26. /**
  27. * 4.若需要清除的节点恰好是尾节点,需要将前驱节点CAS设置为新的尾节点
  28. * 允许CAS失败,一旦失败会转而执行步骤6或8
  29. */
  30. if (node == tail && compareAndSetTail(node, pred)) {
  31. /**
  32. * 5.删除节点predNext -> 因为当前节点已经无用了(断链)
  33. * 允许CAS失败即使失败影响也不大,next更多是优化手段,prev才是根本的判断依据
  34. */
  35. compareAndSetNext(pred, predNext, null);
  36. } else {
  37. int ws;
  38. /**
  39. * 6.这里会对next重置,同时会涉及多个判断,主要由三个逻辑与条件组成:
  40. * 一.前驱节点非头节点 -> 步骤8会有解释
  41. * 二.前驱节点为SIGNAL状态 或 非取消时CAS设置为SINGAL成功 -> 即最终前驱节点必须是SIGNAL
  42. * 三.前驱节点的线程非空 -> 线程若为空,那么问题来了,谁想获取锁,卧底吗???
  43. * 补充一点:head是不会储存thread的,因此之前会有个pred!=head的判断
  44. */
  45. if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL ||
  46. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  47. pred.thread != null) {
  48. /**
  49. * 7.若后继节点waitStatus非取消状态,说明后继节点是想获取锁的
  50. * 此时需要将next重置,主要目的是断开与当前节点的链接,建立新的链接
  51. * 允许CAS失败,即使失败影响也不大,next更多是优化手段,prev才是根本的判断依据
  52. */
  53. Node next = node.next;
  54. if (next != null && next.waitStatus <= 0)
  55. compareAndSetNext(pred, predNext, next);
  56. } else {
  57. // 8.若前驱节点正好是head节点或是取消状态,进入唤醒步骤,具体参见8.4 unparkSuccessor
  58. unparkSuccessor(node);
  59. }
  60. node.next = node; // help GC
  61. }
  62. }

独占模式

独占式获取锁

独占式获取锁总共有三种方式:

1.不响应中断获取锁-acquire: 不响应中断指的是线程获取锁时被中断后能被重新唤醒并继续获取锁,在方法返回后会根据中断状态决定是否重新设置中断

2.响应中断获取锁-acquireInterruptibly: 响应中断指的是当线程获取锁时被中断会立即抛出异常,获取失败

3.响应中断和超时获取锁-tryAcquireNanos-: 处理方式等同响应中断获取,区别是多了超时后直接返回fasle,获取失败


(核心重点)独占式如何成功获取锁:
1.调用tryAcquire方法成功时才能成功获取锁
2.其他所有手段(比如同步队列、CAS自旋volatile变量等)全部是为了辅助调用tryAcquire方法
3.tryAcquire方法中会通过对state进行CAS操作判断是否能够获取锁,即获取锁的根源在于state的值

不响应中断获取锁

不响应中断主要遵循如下四步:

1.tryAcquire: 初次调用子类自实现的tryAcquire方法获取锁,成功即获得锁,否则进入第二步

2.addWaiter: 当前线程封装为独占Node并进入同步队列,等待前驱节点的SIGNAL状态并进入第三步

3.acquireQueued: 自旋获取锁直到调用tryAquire成功获取锁为止(前驱节点为Head时就会尝试调用tryAcquire),自旋过程中可能多次阻塞和解除阻塞,值得注意的是park是进入等待状态

4.selfInterrupt: 若线程获取锁途中被中断,当成功获取锁后,由于中断状态被中途清除,需要补中断状态

  1. /**
  2. * Acquires in exclusive mode, ignoring interrupts. Implemented by invoking
  3. * at least once {@link #tryAcquire},returning on success.
  4. * Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
  5. * invoking {@link #tryAcquire} until success.
  6. * This method can be used to implement method {@link Lock#lock}.
  7. *
  8. * 独占模式获取锁,不响应中断
  9. * 1.成功获取锁的实现是至少一次调用tryAcquire成功并返回true
  10. * 2.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquire成功获取锁
  11. * 3.该方法可用于实现Lock接口的lock方法
  12. *
  13. * @param arg the acquire argument. This value is conveyed to
  14. * {@link #tryAcquire} but is otherwise uninterpreted and
  15. * can represent anything you like.
  16. */
  17. public final void acquire(int arg) {
  18. //若初次调用tryAcquire失败需要封装成独占Node并加入到同步队列中
  19. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  20. //若线程成功获取锁但之前已被中断,由于中断状态被中途清除,需要再次设置中断状态
  21. selfInterrupt();
  22. }
  23. /**
  24. * Acquires in exclusive uninterruptible mode for thread already in
  25. * queue. Used by condition wait methods as well as acquire.
  26. *
  27. * 独占式不响应中断获取锁(当线程已在同步队列中)
  28. * 同时条件队列的wait方法也是使用该方法执行独占式不响应中断获取锁操作
  29. *
  30. * @param node the node 当前线程节点
  31. * @param arg the acquire argument 期望state状态值,通常是1
  32. * @return {@code true} if interrupted while waiting 当获取锁过程中被中断返回true
  33. */
  34. final boolean acquireQueued(final Node node, int arg) {
  35. //记录获取锁是否失败
  36. boolean failed = true;
  37. try {
  38. /**
  39. * 记录获取锁过程中是否被中断
  40. */
  41. boolean interrupted = false;
  42. //自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
  43. for (;;) {
  44. final Node p = node.predecessor();
  45. //前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
  46. if (p == head && tryAcquire(arg)) {
  47. //若成功获取锁,head要指向当前节点,即head是获取到锁的那个节点或者是null
  48. setHead(node);
  49. /**
  50. * 由于setHead中node.prev=null,这里将p.next = null
  51. * 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
  52. */
  53. p.next = null; // help GC
  54. failed = false;
  55. //返回获取锁途中是否被中断过
  56. return interrupted;
  57. }
  58. /**
  59. * 看看能不能安心park,不能的话再来一趟自旋,不怕累
  60. * 若能安心park,则进入等待状态,直到被unpark唤醒
  61. * 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
  62. */
  63. if (shouldParkAfterFailedAcquire(p, node) &&
  64. parkAndCheckInterrupt())
  65. //途中哪怕只被中断一次也要设置中断为true
  66. interrupted = true;
  67. }
  68. } finally {
  69. //失败处理
  70. if (failed)
  71. cancelAcquire(node);
  72. }
  73. }

小问:为神马前驱节点为head时需要再次尝试获取锁呢?
友情小提示:读者可以从前驱节点为head时的锁获取情况去考虑

小答:因为当前驱节点为head时会有两种情况:
**1.前驱节点已成功获取锁并正在占用该锁,但可能很快释放**
**2.前继节点是空节点, 此时已经释放锁, 因此后继节点就有机会获取锁了**

响应中断获取锁

与不响应中断获取锁相比,响应中断获取锁只有两个区别:

1.当获取锁的过程中发生中断,立即抛出中断异常,然后进入finally处理失败

2.同时少了设置中断状态的步骤

  1. /**
  2. * Acquires in exclusive mode, aborting if interrupted.
  3. * Implemented by first checking interrupt status, then invoking
  4. * at least once {@link #tryAcquire}, returning on success.
  5. * Otherwise the thread is queued, possibly repeatedly
  6. * blocking and unblocking, invoking {@link #tryAcquire}
  7. * until success or the thread is interrupted. This method can be
  8. * used to implement method {@link Lock#lockInterruptibly}.
  9. *
  10. * 独占式响应中断获取锁
  11. * 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
  12. * 2.成功获取锁的实现是至少一次调用tryAcquire成功并返回true
  13. * 3.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquire成功获取锁或发生中断
  14. * 4.该方法可用于实现Lock接口的lockInterruptibly方法
  15. *
  16. * @param arg the acquire argument. This value is conveyed to
  17. * {@link #tryAcquire} but is otherwise uninterpreted and
  18. * can represent anything you like.
  19. * @throws InterruptedException if the current thread is interrupted
  20. */
  21. public final void acquireInterruptibly(int arg)
  22. throws InterruptedException {
  23. /**
  24. * 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
  25. * 注意静态方法会清除中断标识
  26. */
  27. if (Thread.interrupted())
  28. throw new InterruptedException();
  29. //若初次调用tryAcquire失败需要封装成独占Node并进入同步队列
  30. if (!tryAcquire(arg))
  31. doAcquireInterruptibly(arg);
  32. }
  33. /**
  34. * Acquires in exclusive interruptible mode.
  35. *
  36. * 独占式响应中断超时获取锁
  37. *
  38. * @param arg the acquire argument
  39. */
  40. private void doAcquireInterruptibly(int arg)
  41. throws InterruptedException {
  42. //当前线程封装成Node
  43. final Node node = addWaiter(Node.EXCLUSIVE);
  44. //记录获取锁是否失败
  45. boolean failed = true;
  46. try {
  47. //自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
  48. for (;;) {
  49. final Node p = node.predecessor();
  50. //前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
  51. if (p == head && tryAcquire(arg)) {
  52. //若成功获取锁,head要指向当前节点,即head是获取到锁的那个节点或者是null
  53. setHead(node);
  54. /**
  55. * 由于setHead中node.prev=null,这里将p.next = null
  56. * 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
  57. */
  58. p.next = null; // help GC
  59. failed = false;
  60. return;
  61. }
  62. /**
  63. * 看看能不能安心park,不能的话再来一趟自旋,不怕累
  64. * 若能安心park,则进入等待状态,直到被unpark唤醒
  65. * 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
  66. * 一旦发生中断,立即抛出异常,停止自旋,然后进入finally处理失败
  67. */
  68. if (shouldParkAfterFailedAcquire(p, node) &&
  69. parkAndCheckInterrupt())
  70. throw new InterruptedException();
  71. }
  72. } finally {
  73. //失败处理(如线程中断)
  74. if (failed)
  75. cancelAcquire(node);
  76. }
  77. }

小问: 若 if (p == head && tryAcquire(arg)) { //恰好在执行到该方法内部时发生中断 },此时会如何处理中断?
友情小提示:读者可以从线程状态角度去考虑

小答:此时该中断会被忽略,原因是该线程目前是运行态,而运行态是不响应中断的
友情推荐:关于中断响应机制读者可参看笔者的 并发番@Thread一文通(1.7版)


响应超时与中断获取锁

与响应中断获取锁相比,响应超时与中断获取锁有三个区别:

1.若获取锁的过程中超过超时阈值,会先执行超时阻塞;否则会先再次自旋

2.一旦超时立即返回fasle,然后进入finally处理失败

3.有返回值返回是否获取锁,成功返回true,失败返回false
  1. /**
  2. * Attempts to acquire in exclusive mode, aborting if interrupted,
  3. * and failing if the given timeout elapses. Implemented by first
  4. * checking interrupt status, then invoking at least once {@link
  5. * #tryAcquire}, returning on success. Otherwise, the thread is
  6. * queued, possibly repeatedly blocking and unblocking, invoking
  7. * {@link #tryAcquire} until success or the thread is interrupted
  8. * or the timeout elapses. This method can be used to implement
  9. * method {@link Lock#tryLock(long, TimeUnit)}.
  10. *
  11. * 独占式响应中断超时获取锁
  12. * 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
  13. * 2.若获取锁超时会立即返回,即获取锁失败
  14. * 3.成功获取锁的实现是至少一次调用tryAcquire成功并返回true
  15. * 4.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquire成功获取锁
  16. * 或发生中断或超时
  17. * 5.该方法可用于实现Lock接口的tryLock(long, TimeUnit)方法
  18. *
  19. * @param arg the acquire argument. This value is conveyed to
  20. * {@link #tryAcquire} but is otherwise uninterpreted and
  21. * can represent anything you like.
  22. * @param nanosTimeout the maximum number of nanoseconds to wait
  23. * @return {@code true} if acquired; {@code false} if timed out
  24. * @throws InterruptedException if the current thread is interrupted
  25. */
  26. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  27. throws InterruptedException {
  28. /**
  29. * 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
  30. * 注意静态方法会清除中断标识
  31. */
  32. if (Thread.interrupted())
  33. throw new InterruptedException();
  34. /**
  35. * 若初次调用tryAcquire失败需要封装成独占Node并进入同步队列
  36. * 由于超时特性,会返回布尔值告知外部是否成功获取锁或获取锁是否超时
  37. */
  38. return tryAcquire(arg) ||
  39. doAcquireNanos(arg, nanosTimeout);
  40. }
  41. /**
  42. * Acquires in exclusive timed mode.
  43. *
  44. * 独占式响应中断超时获取锁
  45. *
  46. * @param arg the acquire argument
  47. * @param nanosTimeout max wait time
  48. * @return {@code true} if acquired
  49. */
  50. private boolean doAcquireNanos(int arg, long nanosTimeout)
  51. throws InterruptedException {
  52. //超时时间必须有效
  53. if (nanosTimeout <= 0L)
  54. return false;
  55. //超时截止时间
  56. final long deadline = System.nanoTime() + nanosTimeout;
  57. //当前线程封装成独占模式Node
  58. final Node node = addWaiter(Node.EXCLUSIVE);
  59. //记录获取锁是否失败
  60. boolean failed = true;
  61. try {
  62. //自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
  63. for (;;) {
  64. final Node p = node.predecessor();
  65. //前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
  66. if (p == head && tryAcquire(arg)) {
  67. //若成功获取锁,head要指向当前节点,即head是获取到锁的那个节点或者是null
  68. setHead(node);
  69. /**
  70. * 由于setHead中node.prev=null,这里将p.next = null
  71. * 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
  72. */
  73. p.next = null; // help GC
  74. failed = false;
  75. //成功获取锁需要返回成功标识,通知外部调用成功
  76. return true;
  77. }
  78. //剩余时间
  79. nanosTimeout = deadline - System.nanoTime();
  80. //超时立即返回false,同时会在finally中执行失败处理
  81. if (nanosTimeout <= 0L)
  82. return false;
  83. /**
  84. * 看看能不能安心park,不能的话再来一趟自旋,不怕累
  85. * 若能安心park,则进入等待状态,直到被unpark唤醒
  86. * 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
  87. * 若同时剩余时间超过自旋时间阈值(默认1000L)即超时阻塞
  88. */
  89. if (shouldParkAfterFailedAcquire(p, node) &&
  90. nanosTimeout > spinForTimeoutThreshold)
  91. LockSupport.parkNanos(this, nanosTimeout);
  92. //若线程在获取锁过程中被中断,立即抛出异常,同时会在finally中执行失败处理
  93. if (Thread.interrupted())
  94. throw new InterruptedException();
  95. }
  96. } finally {
  97. //失败处理(如线程中断或超时)
  98. if (failed)
  99. cancelAcquire(node);
  100. }
  101. }

小问:为神马要设置超时阈值?
友情小提示:读者可能从锁优化角度考虑

小答:在剩余时间(nanosTimeout)小于超时阈值(spinForTimeoutThreshold)时,自旋的效率比LockSupport.park更高且开销更少


独占式释放

独占式释放锁遵循如下步骤:

1.调用子类的tryRelease释放锁资源,若有重复锁需要完全释放

2.当head的waitStatus状态非0,意味着同步队列为空,需要尝试唤醒同步队列中的下一个等待唤醒的线程

  1. /**
  2. * Releases in exclusive mode. Implemented by unblocking one or
  3. * more threads if {@link #tryRelease} returns true.
  4. * This method can be used to implement method {@link Lock#unlock}.
  5. *
  6. * 独占模式释放锁,成功释放依据是tryRelease返回true
  7. * 该方法被用来实现Lock.unlock方法
  8. *
  9. * @param arg the release argument. This value is conveyed to
  10. * {@link #tryRelease} but is otherwise uninterpreted and
  11. * can represent anything you like.
  12. * @return the value returned from {@link #tryRelease}
  13. */
  14. public final boolean release(int arg) {
  15. //调用tryRelease判断是否已成功完全释放锁 --即Status是否被CAS更新为0
  16. if (tryRelease(arg)) {
  17. Node h = head;
  18. /**
  19. * 当head的waitStatus状态非0,意味着同步队列为空
  20. * 需要尝试唤醒同步队列中的下一个等待唤醒的线程
  21. */
  22. if (h != null && h.waitStatus != 0)
  23. //唤醒同步队列中的下一个等待唤醒的线程
  24. unparkSuccessor(h);
  25. return true;
  26. }
  27. return false;
  28. }

分情况讨论:
1.队列为空:直接返回,不走unpark
2.队列非空:
2-1 head刚初始化此时waitstatus = 0,且此时新节点尚未入队(enq尚未结束) ,不走unpark
2-2 新节点入队完成(enq结束),(进入aquireQueue)但第一次自旋时就拿到锁,此时head的waitstatus = 0 且setHead之后仍为0,不走unpark

  2-3 新节点入队完成,但第一次自旋后拿不到锁,则自身的会waitstaus变更为-1(shouldParkAfterFailedAcquire);若在调用should方法之前触发release,由于在should中cas为-1后返回false导致在aquireQueue中继续自旋
  2-4 新节点入队完成&&head的waitstatus=-1,但此时重新自旋获取锁仍然失败,若should发生在casStatus之前则直接返回true否则重新cas为-1并自旋, 此时至多消耗一次无用的unpark操作;若should发生在casStatus之后,

共享模式

共享式获取锁

共享式获取锁也有三种方式,与独占式保持一致:

1.不响应中断获取锁-acquireShared

2.响应中断获取锁-acquireSharedInterruptibly

3.响应中断和超时获取锁-tryAcquireSharedNanos


共享式和独占式获取锁原理基本一致,主要区别在于:

1.子类需要实现共享方式获取锁tryAcquireShared

2.节点出队方式变更,当获取锁成功时前驱节点出队时会传播唤醒操作

不响应中断获取锁

不响应中断主要遵循如下两步:

1.tryAcquireShared: 先调用子类自实现的tryAcquireShared方法获取锁,成功即获得锁,否则进入第二步

2.doAcquireShared: 当前线程封装为共享Node并进入同步队列,自旋+共享方式获取锁

  1. /**
  2. * Acquires in shared mode, ignoring interrupts. Implemented by
  3. * first invoking at least once {@link #tryAcquireShared},
  4. * returning on success. Otherwise the thread is queued, possibly
  5. * repeatedly blocking and unblocking, invoking {@link
  6. * #tryAcquireShared} until success.
  7. *
  8. * 共享模式获取锁,不响应中断
  9. * 1.成功获取锁的实现是至少一次调用tryAcquireShared成功并返回非负数
  10. * 2.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquireShared成功获取锁
  11. *
  12. * @param arg the acquire argument. This value is conveyed to
  13. * {@link #tryAcquireShared} but is otherwise uninterpreted
  14. * and can represent anything you like.
  15. */
  16. public final void acquireShared(int arg) {
  17. /**
  18. * 与独占模式的核心区别之一:
  19. * 通过调用子类实现的tryAcquireShared获取锁
  20. * tryAcquireShared < 0 说明没有获取到锁 ,因此需要确定好State的资源量
  21. */
  22. if (tryAcquireShared(arg) < 0)
  23. doAcquireShared(arg);
  24. }
  25. /**
  26. * Acquires in shared uninterruptible mode.
  27. *
  28. * 共享模式获取锁,不响应中断
  29. *
  30. * @param arg the acquire argument
  31. */
  32. private void doAcquireShared(int arg) {
  33. //当前线程封装为共享Node并进入同步队列,等待前驱节点的SIGNAL状态
  34. final Node node = addWaiter(Node.SHARED);
  35. //记录获取锁是否失败
  36. boolean failed = true;
  37. try {
  38. /**
  39. * 记录获取锁过程中是否被中断
  40. * 注意提供中断响应的方法没有该变量,而是选择直接抛出中断异常
  41. */
  42. boolean interrupted = false;
  43. //自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁
  44. for (;;) {
  45. final Node p = node.predecessor();
  46. if (p == head) {
  47. //前驱节点是head时需要再次尝试tryAcquireShared --成功获取锁的关键
  48. int r = tryAcquireShared(arg);
  49. if (r >= 0) {
  50. /**
  51. * 与独占模式的核心区别之二:
  52. * 当获取锁成功时前驱节点出队,区别于独占模式,共享模式会往
  53. * 后传播唤醒操作,目的是保证还在等待的线程能够尽快获取到锁
  54. */
  55. setHeadAndPropagate(node, r);
  56. /**
  57. * 由于setHeadAndPropagate中node.prev=null,这里将p.next = null
  58. * 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
  59. */
  60. p.next = null;
  61. /**
  62. * 与独占模式的核心区别之三:
  63. * 获取锁途中一旦被中断,直接设置中断标识为true
  64. */
  65. if (interrupted)
  66. selfInterrupt();
  67. failed = false;
  68. return;
  69. }
  70. }
  71. /**
  72. * 看看能不能安心park,不能的话再来一趟自旋,不怕累
  73. * 若能安心park,则进入等待状态,直到被unpark唤醒
  74. * 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
  75. */
  76. if (shouldParkAfterFailedAcquire(p, node) &&
  77. parkAndCheckInterrupt())
  78. interrupted = true;
  79. }
  80. } finally {
  81. //失败处理
  82. if (failed)
  83. cancelAcquire(node);
  84. }
  85. }

小问:为神马共享式不响应中断获取锁可以直接设置中断,而独占式却是返回中断状态(核心区别之三)?
友情小提示:读者可以用selfInterrupt()方法的复用情况方面考虑

小答:这涉及到条件队列的部分内容,条件队列只能用于独占模式(因为使用条件队列的前提就是先获取到锁 -- 管程要求),而在条件队列的多个方法中会根据判断条件决定是否要执行selfInterrupt()方法,因此在共享模式中可以直接中断,而独占需要返回中断状态告知独占或条件队列是否执行selfInterrupt()方法


响应中断获取锁

  1. /**
  2. * Acquires in shared mode, aborting if interrupted. Implemented
  3. * by first checking interrupt status, then invoking at least once
  4. * {@link #tryAcquireShared}, returning on success. Otherwise the
  5. * thread is queued, possibly repeatedly blocking and unblocking,
  6. * invoking {@link #tryAcquireShared} until success or the thread
  7. * is interrupted.
  8. * 共享式响应中断获取锁
  9. * 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
  10. * 2.成功获取锁的实现是至少一次调用tryAcquireShared成功并返回非负数
  11. * 3.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquireShared成功获取锁或发生中断
  12. * 4.该方法可用于实现Lock接口的lockInterruptibly方法
  13. * @param arg the acquire argument.This value is conveyed to
  14. * {@link #tryAcquireShared} but is otherwise uninterpreted
  15. * and can represent anything you like.
  16. *
  17. * @throws InterruptedException if the current thread is interrupted
  18. */
  19. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  20. /**
  21. * 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
  22. * 注意静态方法会清除中断标识
  23. */
  24. if (Thread.interrupted())
  25. throw new InterruptedException();
  26. /**
  27. * 与独占模式的核心区别之一:
  28. * 通过调用子类实现的tryAcquireShared获取锁
  29. * tryAcquireShared < 0 说明没有获取到锁 ,因此需要确定好State的资源量
  30. */
  31. if (tryAcquireShared(arg) < 0)
  32. doAcquireSharedInterruptibly(arg);
  33. }
  34. /**
  35. * Acquires in shared interruptible mode.
  36. * @param arg the acquire argument
  37. */
  38. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  39. //当前线程封装成共享Node
  40. final Node node = addWaiter(Node.SHARED);
  41. //记录获取锁是否失败
  42. boolean failed = true;
  43. try {
  44. //自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
  45. for (;;) {
  46. final Node p = node.predecessor();
  47. if (p == head) {
  48. //前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
  49. int r = tryAcquireShared(arg);
  50. if (r >= 0) {
  51. /**
  52. * 与独占模式的核心区别之二:
  53. * 当获取锁成功时前驱节点出队,区别于独占模式,共享模式会往
  54. * 后传播唤醒操作,目的是保证还在等待的线程能够尽快获取到锁
  55. */
  56. setHeadAndPropagate(node, r);
  57. p.next = null; // help GC
  58. failed = false;
  59. return;
  60. }
  61. }
  62. /**
  63. * 看看能不能安心park,不能的话再来一趟自旋,不怕累
  64. * 若能安心park,则进入等待状态,直到被unpark唤醒
  65. * 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
  66. * 一旦发生中断,立即抛出异常,停止自旋,然后进入finally处理失败
  67. */
  68. if (shouldParkAfterFailedAcquire(p, node) &&
  69. parkAndCheckInterrupt())
  70. throw new InterruptedException();
  71. }
  72. } finally {
  73. //失败处理(如线程中断)
  74. if (failed)
  75. cancelAcquire(node);
  76. }
  77. }

响应超时与中断获取锁

  1. /**
  2. * Attempts to acquire in shared mode, aborting if interrupted, and
  3. * failing if the given timeout elapses. Implemented by first
  4. * checking interrupt status, then invoking at least once {@link
  5. * #tryAcquireShared}, returning on success. Otherwise, the
  6. * thread is queued, possibly repeatedly blocking and unblocking,
  7. * invoking {@link #tryAcquireShared} until success or the thread
  8. * is interrupted or the timeout elapses.
  9. *
  10. * 共享式响应中断超时获取锁
  11. * 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
  12. * 2.若获取锁超时会立即返回,即获取锁失败
  13. * 3.成功获取锁的实现是至少一次调用tryAcquireShared成功并返回非负数
  14. * 4.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquireShared成功获取锁
  15. * 或发生中断或超时
  16. * 5.该方法可用于实现Lock接口的tryLock(long, TimeUnit)方法
  17. *
  18. * @param arg the acquire argument. This value is conveyed to
  19. * {@link #tryAcquireShared} but is otherwise uninterpreted
  20. * and can represent anything you like.
  21. * @param nanosTimeout the maximum number of nanoseconds to wait
  22. * @return {@code true} if acquired; {@code false} if timed out
  23. * @throws InterruptedException if the current thread is interrupted
  24. */
  25. public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{
  26. /**
  27. * 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
  28. * 注意静态方法会清除中断标识
  29. */
  30. if (Thread.interrupted())
  31. throw new InterruptedException();
  32. /**
  33. * 若初次调用tryAcquireShared失败需要封装成独占Node并进入同步队列
  34. * 由于超时特性,会返回布尔值告知外部是否成功获取锁或获取锁是否超时
  35. */
  36. return tryAcquireShared(arg) >= 0 ||
  37. doAcquireSharedNanos(arg, nanosTimeout);
  38. }
  39. /**
  40. * Acquires in shared timed mode.
  41. *
  42. * 共享式响应中断超时获取锁
  43. *
  44. * @param arg the acquire argument
  45. * @param nanosTimeout max wait time
  46. * @return {@code true} if acquired
  47. */
  48. private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
  49. //超时时间必须有效
  50. if (nanosTimeout <= 0L)
  51. return false;
  52. //超时截止时间
  53. final long deadline = System.nanoTime() + nanosTimeout;
  54. //当前线程封装成共享模式Node
  55. final Node node = addWaiter(Node.SHARED);
  56. //记录获取锁是否失败
  57. boolean failed = true;
  58. try {
  59. //自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
  60. for (;;) {
  61. final Node p = node.predecessor();
  62. //注意传播也是从head开始一个个唤醒的,不是一次性的(只不过快到好比一次性)
  63. if (p == head) {
  64. //前驱节点是head时需要再次尝试tryAcquireShared --成功获取锁的关键
  65. int r = tryAcquireShared(arg);
  66. if (r >= 0) {
  67. /**
  68. * 与独占模式的核心区别之二:
  69. * 当获取锁成功时前驱节点出队,区别于独占模式,共享模式会往
  70. * 后传播唤醒操作,目的是保证还在等待的线程能够尽快获取到锁
  71. */
  72. setHeadAndPropagate(node, r);
  73. /**
  74. * 由于setHead中node.prev=null,这里将p.next = null
  75. * 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
  76. */
  77. p.next = null; // help GC
  78. failed = false;
  79. //成功获取锁需要返回成功标识,通知外部调用成功
  80. return true;
  81. }
  82. }
  83. //剩余时间
  84. nanosTimeout = head || rline - System.nanoTime();
  85. //超时立即返回false,同时会在finally中执行失败处理
  86. if (nanosTimeout <= 0L)
  87. return false;
  88. /**
  89. * 看看能不能安心park,不能的话再来一趟自旋,不怕累
  90. * 若能安心park,则进入等待状态,直到被unpark唤醒
  91. * 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
  92. * 若同时剩余时间超过自旋时间阈值(默认1000L)即超时阻塞
  93. */
  94. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckI
  95. nanosTimeout > spinForTimeoutThreshold)
  96. LockSupport.parkNanos(this, nanosTimeout);
  97. //若线程在获取锁过程中被中断,立即抛出异常,同时会在finally中执行失败处理
  98. if (Thread.interrupted())
  99. throw new InterruptedException();
  100. }
  101. }
  102. } finally {
  103. //失败处理(如线程中断或超时)
  104. if (failed)
  105. cancelAcquire(node);
  106. }
  107. }

共享式释放

共享式释放相比独占式来说有个特色:

当同步队列中存在多个共享节点且资源够数时可能会并发地唤醒后继节点,因为共享模式下获取锁后就近唤醒后继节点

  1. /**
  2. * Releases in shared mode. Implemented by unblocking one or more
  3. * threads if {@link #tryReleaseShared} returns true.
  4. *
  5. * 共享模式释放锁
  6. * 实现原理是tryReleaseShared返回true时解除一个或多个线程的阻塞
  7. *
  8. * @param arg the release argument. This value is conveyed to
  9. * {@link #tryReleaseShared} but is otherwise uninterpreted
  10. * and can represent anything you like.
  11. * @return the value returned from {@link #tryReleaseShared}
  12. */
  13. public final boolean releaseShared(int arg) {
  14. if (tryReleaseShared(arg)) {
  15. doReleaseShared();
  16. return true;
  17. }
  18. return false;
  19. }
  20. /**
  21. * Release action for shared mode -- signals successor and ensures
  22. * propagation. (Note: For exclusive mode, release just amounts
  23. * to calling unparkSuccessor of head if it needs signal.)
  24. */
  25. private void doReleaseShared() {
  26. //自旋方式释放锁
  27. for (;;) {
  28. Node h = head;
  29. //队列不为空且有后继节点
  30. if (h != null && h != tail) {
  31. int ws = h.waitStatus;
  32. //唤醒前提都是Head节点状态为SIGNAL
  33. if (ws == Node.SIGNAL) {
  34. //只有当CAS回归纯真成功时才选择去唤醒后继节点
  35. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  36. continue; //(1) loop to recheck cases
  37. unparkSuccessor(h);
  38. //之所以要设置PROPAGATE主要是区别于独占模式的0,以告知到时要用传播方式进行后续处理
  39. } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  40. continue; //(2) loop on failed CAS
  41. }
  42. //如果head没变化,说明没有唤醒的线程啦,功成名就
  43. if (h == head) //(3) loop if head changed
  44. break;
  45. //其他情况,比如head节点无变化、ws为取消状态等,直接跳过不理会就好
  46. }
  47. }

小问:共享模式为何采用自旋方式释放锁?
友情小提示:遇到CAS操作,我们都需要考虑三个点:1.为何此处用CAS 2.若CAS成功呢? 3.若CAS失败呢?
补充小提示:CAS之所以会失败,通常都是发生了竞争,当然也可能是异常、中断等,因此失败原因很重要


线程中断、阻塞、唤醒

shouldParkAfterFailedAcquire

当线程无法获取锁时,会先通过该方法处理三种状态情况:

1.前驱节点状态为SIGNAL: 一定会唤醒后继节点,直接返回true,说明可以安心park

2.前驱节点状态为CANCELLED: 需要反向找到一个非取消状态节点作为新前驱节点,返回false,此时不允许park

3.前驱节点为0或PROPAGATE(共享): 由于前驱节点和当前节点都存在,因此存在前驱节点需要唤醒后继节点的必要性,因此前驱节点需要被设置为SIGNAL状态

  1. /**
  2. * 当节点没有获取到锁时,通过检查和更新waitStatus,
  3. * 以判断线程是否可以安心park进入等待状态
  4. *
  5. * @param pred 前驱节点
  6. * @param node the node 当前节点
  7. * @return {@code true} 线程应被阻塞时返回true
  8. */
  9. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  10. /**
  11. * 线程能否安心park进入等待状态取决于前驱节点的waiStatus值
  12. * 换句话说,前驱节点的waiStatus值决定了当前节点的'命'!
  13. */
  14. int ws = pred.waitStatus;
  15. //1.waitStatus == Node.SIGNAL 待唤醒状态
  16. if (ws == Node.SIGNAL)
  17. /**
  18. * 只有前驱节点为SIGNAL状态,当前线程才能安心被park
  19. * 否则前驱节点在获取锁之后是不会通知当前线程的,
  20. * 当前线程一脸懵逼的哭死在无限等待中?!
  21. */
  22. return true;
  23. //2.waitStatus >0 即 waitStatus == Node.CANCELLED == 1 取消状态
  24. //取消状态时需要重选一个非取消状态的前驱节点
  25. if (ws > 0) {
  26. /**
  27. * 沿prev反向遍历直到找到一个非取消状态的前驱节点
  28. *
  29. * 小问:那么中间那些取消状态的节点怎么办??
  30. * 小答:笑话,GC老爷子怎么会忘记回收在队列中
  31. * 这些'在其位不谋其政'(放弃获取锁但仍在队列中)的'垃圾们'呢 -
  32. * 正经说法:由于前塞动作,已放弃的节点相当于形成一条无引用链,不久就会被GC
  33. */
  34. do {
  35. //前塞动作 已放弃获取锁的取消状态节点会形成一条无引用链
  36. node.prev = pred = pred.prev;
  37. } while (pred.waitStatus > 0);
  38. //注意不要忘记将新的前驱节点的next指向当前节点(相爱相杀不能忘)
  39. pred.next = node;
  40. } else {
  41. //3.waitStatus == Node.PROPAGATE || 0
  42. /**
  43. * 为了后继节点能够安心park,需要将前驱节点设置为SIGNAL状态
  44. * 这样当前驱节点拿到锁了就可以通知后驱节点(即当前线程):后面那个,差不多该醒啦!
  45. * 注意:该方法不存在Node.CONDITION的情况
  46. */
  47. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  48. }
  49. //前驱节点非Node.SIGNAL都不应park
  50. return false;
  51. }

小问:为神马compareAndSetWaitStatus(pred, ws, Node.SIGNAL)后仍要返回false,而不是直接返回true?
友情小提示:读者可以从自旋意义这一方面入手考虑

小答:其实从首先判断 ws==Node.SIGNAL我们就可以一窥一二,理由有三:
1.CAS是允许失败的,当CAS失败时说明前驱节点已经出现了问题(比如刚释放完),很明显是不安全的
2.即使CAS失败也可补救,因此搭配自旋,大不了再自旋一轮嘛,多大点事!确保安全第一位嘛!
3.当CAS成功后,结合首先判断ws==Node.SIGNAL,我们可以确保在安全环境下快速返回,提高效率

核心重点:该方法根本作用就是要确保成功地设置前驱节点的SIGNAL状态,以确保前驱节点释放锁后一定能唤醒被park阻塞的后继节点


parkAndCheckInterrupt

注意前提:只有在前驱节点为SIGNAL状态下才能park阻塞当前节点

  1. /**
  2. * 线程park进入等待状态,并返回是否被中断唤醒
  3. * @return {@code true} if interrupted 若被中断唤醒返回true,否则false
  4. */
  5. private final boolean parkAndCheckInterrupt() {
  6. //PS:park后线程进入等待状态,线程阻塞在此处,不会继续往后执行,即不会执行到步骤2
  7. LockSupport.park(this);//步骤1
  8. /**
  9. * 需要注意的是静态方法会清除中断标志
  10. * 想要中断任务的话需要在外部再次设置中断
  11. *
  12. * PS:线程只有被unpark唤醒后才会接着继续执行步骤2
  13. */
  14. return Thread.interrupted();//步骤2
  15. }
友情推荐:

- 关于中断机制读者可参看笔者的 并发番@Thread一文通(1.7版) 6.线程中断

- 关于park读者可参看笔者的 并发番@LockSupport一文通(1.8版)

selfInterrupt

  1. /**
  2. * 补中断:设置线程中断标志为true
  3. */
  4. static void selfInterrupt() {
  5. Thread.currentThread().interrupt();
  6. }

unparkSuccessor

唤醒后继节点有三个注意事项:

- 1.状态位变化:初始状态0 -> Node.SIGNAL(作为唤醒后继节点的依据) -> 0 (唤醒后继节点后回归纯真)

- 2.若当前节点的后继节点不存在或是取消状态,需要反向遍历去找最靠近当前节点的非取消状态后继节点

- 3.若存在非取消状态的后继节点就唤醒他,当然找不到就算了(说明没有需要获取锁的节点存在啦)

  1. /**
  2. * 唤醒当前节点的后继节点
  3. * @param node the node 当前节点
  4. */
  5. private void unparkSuccessor(Node node) {
  6. int ws = node.waitStatus;
  7. //重置当前节点标志位
  8. if (ws < 0)
  9. /**
  10. * 若是负数就表示未取消,需要回归纯真状态-> 0
  11. * 允许失败,因为只有取消状态的节点才会被回收
  12. */
  13. compareAndSetWaitStatus(node, ws, 0);
  14. /*
  15. * Thread to unpark is held in successor, which is normally just the next node.
  16. * But if cancelled or apparently null,traverse backwards from tail to find
  17. * the actual non-cancelled successor.
  18. *
  19. * 通常node.next就是要被唤醒的后继节点
  20. * 但若next节点被取消或为空,需要反向遍历去找最靠前的非取消状态节点并唤醒她
  21. * waitStatus <= 0 说明未取消,就是节点还想获取锁的意思
  22. * 共享模式时会发生s == null的情况
  23. */
  24. Node s = node.next;
  25. if (s == null || s.waitStatus > 0) {
  26. s = null;
  27. //换句话说,当t == null || t == node时停止循环
  28. for (Node t = tail; t != null && t != node; t = t.prev)
  29. //找到同步队列中最靠前(即最靠近node)的想要获取锁的节点
  30. if (t.waitStatus <= 0)
  31. /**
  32. * 小问:为啥是最靠前呢?
  33. *
  34. * 小答:因为在这里会不断把非取消状态的节点赋值给s
  35. * 直到是最靠近node(即最靠前)的时候才停止
  36. * 而不是一旦找到一个非取消状态的节点就停止循环啦!
  37. */
  38. s = t;
  39. }
  40. /**
  41. * 如果后继节点非空,那就让我们唤醒她吧!
  42. */
  43. if (s != null)
  44. LockSupport.unpark(s.thread);
  45. }

小问:为神马要沿着prev从tail往前获取非取消节点呢?
友情小提示:读者可以从入队、出队、取消操作入手考虑prev和next的线程安全问题

小答:这个问题实际上考察了两个知识点,一个是为何要反向遍历,另一个是为何要跳过取消节点:

1.反向遍历:prev赋值线程安全,next赋值非线程安全且在CLH队列不同形态会有不同的表现,判断逻辑会比prev复杂很多

2.跳过取消节点:取消状态节点都是需要被GC回收的节点,因为其放弃了获取锁的机会,正所谓强扭的瓜不甜呀!


小问:有心的读者会发现在unparkSuccessor方法中其实并没有操作队列,那么线程节点是何时出队的呢?
友情小提示:读者可以回顾一下获取锁的整个过程重新梳理一下

小答:unparkSuccessor方法主要做了个唤醒操作,真正的出队操作(比如调整prev、next)是在setHeadshouldParkAfterFailedAcquirecancelAcquire等方法中实际完成的


题外话

笔者看了一下,距离上篇已过了一月有余,以至于期间不止一人问笔者是否已断更(疼)。笔者正好借此机会谈一下笔者的想法,笔者会一直写下去,之所以长时间不更新,主要有如下几点原因:
1.不可原谅的客观原因:笔者近期的确事情很多,不止于工作
2.出一篇令自己满意的、接近"精品"的文章真的给了笔者很大的压力,唯恐误人子弟
3.为了此篇,笔者花了很长时间去看了很多资料(也包括翻译Doug Lea老师的论文),但由于水平有限,在加上此篇的确需要好好研读,因此无论从构思还是内容都花了大量的时间,期间笔者都不知修改了多少次都不甚满意,甚至重写的情况,排期真是一变再变
4.由于并发的难以预料带来的难以理解,因此在理解AQS时,需要从多个维度去思考问题,包括每行代码的意义、CAS操作(如为何此处用CAS?CAS成功后?CAS失败后?)、从单个线程到并发、从入队到出队、从好人模式切换到坏人模式等等,AQS篇的精妙之处还远不止笔者目前所述(此篇还是略显仓促),因此随着笔者功力的加深,笔者会坚持继续维护此篇
5.因为种种原因造成的此篇内容有错或者不足之处,笔者十分抱歉,也感谢读者能够热心指点一二,不胜感激!!!
6.最好还是希望读者们继续支持笔者,支持XX番系列~

并发番@AbstractQueuedSynchronizer一文通黄志鹏kira 创作,采用 知识共享 署名-非商业性使用 4.0 国际 许可协议 进行许可。

本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注