@kiraSally
2021-05-26T15:42:03.000000Z
字数 33534
阅读 3690
并发
1.8版
- 推荐先阅读笔者的 并发番@AQS框架一文通 一文
- 感谢支持 笔者掘金专栏博文
作用: AQS是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义
功能: AQS框架提供实现阻塞锁和依赖FIFO等待队列的关联同步器
按需调用: AQS并不会实现任何同步接口,相反仅是提供一些方法以便具体锁和关联同步器按需调用
适用: 该类适用于依赖单个原子int变量表示同步状态的多种形式的同步器
原理: 内部使用一个volatile int变量表示同步状态,通过FIFO同步队列实现资源获取线程的排队工作,通过UnSafe实现底层的等待与唤醒操作,通过ConditionObject实现条件变量的使用
同步状态: 即「volatile int State」变量,该状态其实质就是可用资源数(因此是数值而不是布尔值)
Node节点: 队列操作的基本元素,线程入队前会先被封装成Node节点,其会记录队列操作所需的重要属性
同步队列: FIFO等待队列,CLH锁的变种实现并同时支持独占和共享模式,当线程获取锁失败会进入同步队列中等待,成功获取锁或因中断、异常等原因获取锁失败时出队
条件队列: 只用于独占模式, 且使用Condition的前提是线程已经获取到锁,并发番@ConditionObject一文通
AQS中加锁的基本流程如下(以独占模式为例):
- 论文版:
//循环判断同步状态是否可取
while (synchronization state does not allow acquire) {
//不可取时,线程入同步队列(若尚未进入同步队列)
enqueue current thread if not already queued;
//阻塞当前线程
possibly block current thread;
}
//成功获取锁后出队(当然异常失败也需要出队)
dequeue current thread if it was queued;
- 实现版:
//1.tryAcquire会CAS更新State,更新成功获取锁,否则进入同步队列以自旋方式获取锁
if(!tryAcquire(arg)){
//自旋方式获取锁
for (;;) {
//2.若当前节点的前驱节点为head, 则再次尝试获取锁
if(node.prev == head && tryAcquire(arg)){
//3.获取锁后重设head,共享模式下需要传播唤醒后继节点
setHead(node);
}
//4.获取锁失败,则安全更新前驱节点的waitStatus的值为SINGAL并对当前节点的线程进行阻塞
CAS(node,waitStatus,SINGAL) && park(node)
}
}
此时我们可以先关注四个点:state、队列头节点、节点的waitSatus、阻塞当前线程
AQS中解锁的基本流程如下(以独占模式为例):
- 论文版:
//更新同步状态
update synchronization state;
//判断同步状态是否可取
if (state may permit a blocked thread to acquire)
//同步状态可取则从同步队列中释放阻塞线程
unblock one or more queued threads;
- 实现版:
//1.tryRelease会CAS更新State为0,成功返回true
if(tryRelease(arg)){
//2.若是SIGNAL,则将waitStatus回归为0同时唤醒后继节点
if (node.waitStatus != 0)
CAS(node,waitStatus,0) && unpark(node.next);
}
此时我们可以先关注四个点:state、出队、节点的waitStatus、唤醒后继线程
独占模式: 即只有一个线程能持有锁(单资源,排他性),AQS提供「acquire」「release」方法
共享模式: 即有多个线程能持有锁(多资源),AQS提供「acquireShared」「releaseShared」方法
独占模式: 即只有一个线程能持有锁(单资源),AQS提供「acquireInterruptibly」「tryAcquireNanos」方法
共享模式: 即有多个线程能持有锁(多资源),AQS提供「acquireSharedInterruptibly」「tryAcquireSharedNanos」方法
实现独占: 「tryAcquire」「tryRelease」「isHeldExclusively」
实现共享: 「tryAcquireShared」「tryReleaseShared」
补充: 在使用时使用者只需要实现独占和共享的其中一种即可(如「ReentrantLock」),当然也支持两者都实现(如「ReentrantReadWriteLock」)
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
protected AbstractQueuedSynchronizer() { }
/**
* CLH锁同步队列的头节点,延迟初始化(懒加载)
* 除了初始化,其他时刻只能被setHead进行更新
* 注意:当head存在时,waitStatus不能是CANCELLED状态
*/
private transient volatile Node head;
/**
* CLH锁同步队列的尾节点,延迟初始化(懒加载)
* 只能在入队时新增一个Node时进行更新
*/
private transient volatile Node tail;
/**
* 同步状态 - volatile保证其可见性
*/
private volatile int state;
Node: 每个线程会被封装成一个Node节点,其中会记录线程、节点状态和前后节点等信息,详情请参见3.Node节点
ConditionObject: 条件变量,用于实现管程形式的条件控制,详情请参见并发番@AbstractQueuedSynchronized一文通
作用: 用于提供CAS原子更新操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* CAS原子更新head节点,仅用于同步队列的enq入队操作
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS原子更新tail节点,仅用于同步队列的enq入队操作
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS更新原子更新Node节点的waitStatus变量
*/
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
/**
* CAS next field of a node.原子更新Node节点的next变量,仅用于cancelAcquire取消获取操作
*/
private static final boolean compareAndSetNext(Node node,Node expect,Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
作用: AQS框架的变种CLH锁借由Node组成的「FIFO双向链表队列」实现
链接: 每个Node通过「pred」链接其前驱节点,通过「next」链接其后继节点
条件支持: 每个Node同时会通过「nextWaiter」提供对「Condition」的支持
模式: 每个Node都可以支持独占「EXCLUSIVE」或共享「SHARED」模式
初始化: CLH锁只有在第一次入队时(即第一次出现竞争时)会初始化「Head」和「Tail」,主要是性能考究(默认少竞争)
static final class Node {
/** 标记共享模式 */
static final Node SHARED = new Node();
/** 标记独占模式 */
static final Node EXCLUSIVE = null;
/** 标记节点被取消 */
static final int CANCELLED = 1;
/** 标记后继节点需要被唤醒 */
static final int SIGNAL = -1;
/** 标记节点位于条件阻塞队列中 */
static final int CONDITION = -2;
/**
* 标记共享模式下,节点共享状态正在被传播(acquireShared)
* 当前节点获得锁或释放锁时, 共享模式下节点的最终状态是 PROPAGATE
*/
static final int PROPAGATE = -3;
/**
* !!!重中之重!!!
* 标记节点状态,默认为0,负数无须唤醒,使用CAS原子更新
* 独占模式:SIGNAL、CANCEL、0
* 共享模式:SIGNAL、CANCEL、PROPAGATE、0
* 条件变量:CONDITION状态不会存在于CLH锁同步队列中,只用于条件阻塞队列
*/
volatile int waitStatus;
/**
* 在CLH锁同步队列中链接前驱节点,使用CAS原子更新,每次入队和GC出队时会被指派
* 当前驱节点被取消时,一定能找到一个未被取消的节点,因为Head节点永远不会被取消:头节点必须成功aquire
* 被取消的线程不会再次成功aquire,线程只能取消自己不会影响其他
* 主要作用是在循环中跳过CANCELLED状态的节点
*/
volatile Node prev;
/**
* 在CLH锁同步队列中链接后继节点,每次入队、前驱节点被取消以及GC出队时被指派
* 赋值操作非线程安全,next为null时并不意味着节点不存在后继节点
* 当next不为null时,next是可靠的
* 主要作用是在释放锁时对后继节点进行唤醒
*/
volatile Node next;
/** Node关联线程 */
volatile Thread thread;
/**
* 链接位于条件阻塞队列的节点或特定SHARED值
* 实际作用就是标记Node是共享模式还是独占模式
* 独占模式时为null,共享模式时为SHARED
* 在条件阻塞队列中指向下一个节点
*/
Node nextWaiter;
/**
* 判断Node是否为共享模式
* @Return true 是 false 不是
*/
final boolean isShared() {
//当是共享模式时,nextWaiter就是SHARED值,独占模式就是null
return nextWaiter == SHARED;
}
/**
* 返回前驱节点,当前驱节点为空时直接抛空指针异常(实际上Head永远不会为null)
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
//空指针判断只要是为了help gc
if (p == null)
throw new NullPointerException();
else
return p;
}
//默认共享模式
Node() {}
// Used by addWaiter 用于CLH锁同步队列
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// Used by Condition 用于条件阻塞队列
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
SIGNAL(-1): 标记唤醒,当前节点被释放后必须唤醒后继节点
CANCELLED(1): 标记已取消,当Node因超时或中断被取消,取消状态不可变且对应线程不可再次阻塞
CONDITION(-2): 标记条件阻塞,即Node位于条件变量的阻塞队列中(或者说是条件阻塞队列)
PROPAGATE(-3): 标记传递中,仅用于标记位于同步队列的头节点,表示共享状态该正在被传递中
0: 默认为0,当为条件阻塞时默认为-2,非负数意味着无须唤醒,此值使用CAS原子更新
1.必须使用CAS对「State」状态进行原子更新
2.当「State」状态>0时,说明当前线程已持有该锁;当「State」状态=0时,说明当前线程无该锁
3.由于State可自增,因此可用于实现可重入,如「ReentrantLock.lock()」
4.独占模式中「State」的值最多为1 ,共享模式中「State」的值可以任意大(如「CountDownLatch」)
/**
* 同步状态
*/
private volatile int state;
/**
* 返回当前同步状态,该操作具有volatile内存读语义 - 直接从主内存读取到最新值
*/
protected final int getState() {
return state;
}
/**
* 设置当前同步状态,该操作具有volatile内存写语义 - 一旦变更直接刷新到主内存
*/
protected final void setState(int newState) {
state = newState;
}
/**
* CAS更新当前同步状态,该操作同时具备volatile内存读-写语义
* @param expect 期望值
* @param update 新值
* @return {@code true} 成功返回true,失败返回false(即实际值!=期望值)
*/
protected final boolean compareAndSetState(int expect, int update) {
// 底层调用通过Unsafe调用CAS
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
1.封装Node: 将当前线程封装成Node同时指定独占或共享模式
2.初始化: 当未初始化时,初始化头节点head和尾节点tail
3.队尾追加: 根据FIFO原则,新增节点会被追加到队尾
4.CAS更新tail: 通常将新增节点作为新的tail
小问:如何保证新增节点一定入队并且tail设置成功?
友情小提示:读者可以思考CAS结合重试的解决方案
/**
* 新增一个等待者
* - 创建和入队一个新node
* - 根据FIFO,新增的追加到队尾并被设置为tail
*
* @param mode Node.EXCLUSIVE for exclusive 独占, Node.SHARED for shared 共享
* @return the new node 返回新Node
*/
private Node addWaiter(Node mode) {
//1.当前线程封装成Node,并指定独占或共享模式
Node node = new Node(Thread.currentThread(), mode);
//记录原tail -> 根据FIFO,新增的追加到队尾并被设置为tail
Node pred = tail;
/**
* 当pred为null时,说明还未初始化,应该直接走enq方法完成初始化
* if代码块是快速入队的一个优化:仅当CAS操作失败才会进入enq方法进行自旋
*/
if (pred != null) {
node.prev = pred;
/**
* CAS更新tail -> 将新增节点设置为tail
* 允许CAS失败,如果失败直接进入enq,采用自旋方式入队
*/
if (compareAndSetTail(pred, node)) {
//将原tail节点的next链接指向当前节点
pred.next = node;
return node;
}
}
//采用自旋方式入队
enq(node);
return node;
}
/**
* 向队尾插入一个节点,未初始化(tail为null)时完成初始化
* 必须CAS更新Head和Tail
* @param node the node to insert 待插入节点
* @return node's predecessor 待插入节点的前驱节点(即原tail)
*/
private Node enq(final Node node) {
//自旋方式入队
for (;;) {
Node t = tail;
/**
* 2.初始化 -> 当tail为null时就意味着队列为空
* - 初始化时会同时生成head和tail
* - 其中head一直作为dummy节点存在,不需要存储thread,
* 但需要记录waitStatus字段,以作为唤醒后继节点的依据
* - 其中tail一直作为实体节点存在,会存储thread
*/
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
/**
* 3.新节点追加到队尾,关于prev有三个重要论点:
* 一.Node使用prev(前驱节点)作为形成链的根本依据
* 二.当节点位于同步队列中,prev一定非空
* 三.但prev非空并不意味着节点位于同步队列中
* 因为发生竞争时CAS更新tail是允许失败的,一旦CAS失败就再自旋一次
* 当CAS更新tail失败时,由于节点只是将prev指向tail但并没有设置tail成功
* 此时并不能算作真正的入队(原因在于后面获取和释放操作都是基于tail的)
*/
node.prev = t;
/**
* 4.CAS更新tail,关于next有三个重要论点:
* 一.Node使用next作为形成链的一种优化辅助手段
* 二.当next非空,节点一定存在同步队列中
* 三.但节点存在同步队列时,next不一定非空
*/
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
小问:为神马说next是非线程安全的?有什么隐患吗?
友情小提示:读者可以从tail的更新时机角度考虑
t.next=node
,此时节点已经真正入队;t.next=node
非CAS操作,因此是非线程安全的;小问:为神马要加入一个 dummy 节点(head)?
友情小提示:读者可以从CLH锁变种考虑
1.设置Head: Head指向成功获取锁(即待出队)的节点,作为新的头节点
2.清空当前节点引用: 待出队的节点需要将thread和prev属性设置为null,help gc
3.清空前驱节点next: 由于当前节点需要被GC,因此也要清除其前驱节点的next
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node 成功获取锁的节点(即待出队节点)
*/
private void setHead(Node node) {
// 1.head指向待出队节点
head = node;
// 2.清空当前节点引用
node.thread = null;
node.prev = null;
}
// 3.清空前驱节点next
p.next = null;
小问:为神马要清空节点引用??
友情小提示:读者可以从Node的内部变量的作用以及head作用在同步队列中的作用这方面去考虑
1.其核心在于waitStatus的值(当为SIGNAL-1时才会去唤醒后继节点),因此Node的其他属性并不重要,Head更多是虚拟节点的存在,只存储waitStatus即可,以作为唤醒后继的依据
2.同时清空thread、prev、prev.next有利于GC回收待出队的节点,因为该节点在给head设置完waitStatus之后就完成了使命,线程可以出队了
1.设置Head: 设置头节点,同时检测在共享模式下是否有后继者等待获取锁
2.向后传播唤醒: 如果存在,则在满足(propagate > 0 或 节点状态为PROPAGATE)时传播唤醒
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* 共享模式成功获取锁出队
* - 设置头节点并继续向后传播
* 1.设置头节点,同时检测在共享模式下是否有后继者等待获取锁
* 2.如果存在,则在满足(propagate > 0 或 节点状态为PROPAGATE)时传播唤醒
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
//设置头节点
Node h = head; // Record old head for check below
setHead(node);
/**
* 这里做了很多逻辑或判断,主要目的就是为了判断是否还有剩余资源以唤醒后续线程节点
* 1.propagate > 0 :意味着还有剩余资源(state>0),共享时当然需要继续唤醒
* 2.h == null : 头节点为空(这个比较诡异-we don't know, because it appears null)
* 3.h.waitStatus <0 : 头节点状态为负数(尤其是PROPAGATE),说明需要唤醒后继节点
*
* 注意: 这些保守的检查,在竞争环境下获取/释放锁可能会导致不必要的多次唤醒
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//注意:一旦遇到节点是独占模式,即使propagate>0也会停止往后传播啦
if (s == null || s.isShared())
// 唤醒后继结点
doReleaseShared();
}
}
小问:何时会出现h==null的情况?
友情小提示:这个可能需要测试极端环境,如果有读者有想法的欢迎留言指导笔者一二
友情小提示:建议在看完aquire操作之后再回看此章
1.清空线程引用:清空当前节点的线程引用,便于当前节点出队和GC
2.寻找非取消状态前驱节点:沿prev反向遍历直到找到一个非取消状态前驱节点
3.设置CANCELLED状态:核心步骤,失败的节点需要设置为取消状态
4.清除、重置、唤醒:见步骤4-8
/**
* Cancels an ongoing attempt to acquire.
*
* 清除因中断/超时而放弃获取lock的线程节点
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 1.清空线程引用
node.thread = null;
// Skip cancelled predecessors 跳过所有取消状态前驱节点
Node pred = node.prev;
// 2.沿prev反向遍历直到找到一个非取消状态前驱节点,同时也顺便清除途中出现的取消状态节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 注意predNext也是需要清除的
Node predNext = pred.next;
/**
* 3. 节点状态需要设置为CANCELLED,即标记为可回收的无用节点
* CANCELLED状态的节点之后除了被清除不会再参与任何操作,等同于"垃圾"了
*/
node.waitStatus = Node.CANCELLED;
/**
* 4.若需要清除的节点恰好是尾节点,需要将前驱节点CAS设置为新的尾节点
* 允许CAS失败,一旦失败会转而执行步骤6或8
*/
if (node == tail && compareAndSetTail(node, pred)) {
/**
* 5.删除节点predNext -> 因为当前节点已经无用了(断链)
* 允许CAS失败即使失败影响也不大,next更多是优化手段,prev才是根本的判断依据
*/
compareAndSetNext(pred, predNext, null);
} else {
int ws;
/**
* 6.这里会对next重置,同时会涉及多个判断,主要由三个逻辑与条件组成:
* 一.前驱节点非头节点 -> 步骤8会有解释
* 二.前驱节点为SIGNAL状态 或 非取消时CAS设置为SINGAL成功 -> 即最终前驱节点必须是SIGNAL
* 三.前驱节点的线程非空 -> 线程若为空,那么问题来了,谁想获取锁,卧底吗???
* 补充一点:head是不会储存thread的,因此之前会有个pred!=head的判断
*/
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
/**
* 7.若后继节点waitStatus非取消状态,说明后继节点是想获取锁的
* 此时需要将next重置,主要目的是断开与当前节点的链接,建立新的链接
* 允许CAS失败,即使失败影响也不大,next更多是优化手段,prev才是根本的判断依据
*/
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 8.若前驱节点正好是head节点或是取消状态,进入唤醒步骤,具体参见8.4 unparkSuccessor
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
1.不响应中断获取锁-acquire: 不响应中断指的是线程获取锁时被中断后能被重新唤醒并继续获取锁,在方法返回后会根据中断状态决定是否重新设置中断
2.响应中断获取锁-acquireInterruptibly: 响应中断指的是当线程获取锁时被中断会立即抛出异常,获取失败
3.响应中断和超时获取锁-tryAcquireNanos-: 处理方式等同响应中断获取
,区别是多了超时后直接返回fasle,获取失败
1.tryAcquire: 初次调用子类自实现的tryAcquire方法获取锁,成功即获得锁,否则进入第二步
2.addWaiter: 当前线程封装为独占Node并进入同步队列,等待前驱节点的SIGNAL状态并进入第三步
3.acquireQueued: 自旋获取锁直到调用tryAquire成功获取锁为止(前驱节点为Head时就会尝试调用tryAcquire),自旋过程中可能多次阻塞和解除阻塞,值得注意的是park是进入等待状态
4.selfInterrupt: 若线程获取锁途中被中断,当成功获取锁后,由于中断状态被中途清除,需要补中断状态
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented by invoking
* at least once {@link #tryAcquire},returning on success.
* Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquire} until success.
* This method can be used to implement method {@link Lock#lock}.
*
* 独占模式获取锁,不响应中断
* 1.成功获取锁的实现是至少一次调用tryAcquire成功并返回true
* 2.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquire成功获取锁
* 3.该方法可用于实现Lock接口的lock方法
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
//若初次调用tryAcquire失败需要封装成独占Node并加入到同步队列中
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//若线程成功获取锁但之前已被中断,由于中断状态被中途清除,需要再次设置中断状态
selfInterrupt();
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* 独占式不响应中断获取锁(当线程已在同步队列中)
* 同时条件队列的wait方法也是使用该方法执行独占式不响应中断获取锁操作
*
* @param node the node 当前线程节点
* @param arg the acquire argument 期望state状态值,通常是1
* @return {@code true} if interrupted while waiting 当获取锁过程中被中断返回true
*/
final boolean acquireQueued(final Node node, int arg) {
//记录获取锁是否失败
boolean failed = true;
try {
/**
* 记录获取锁过程中是否被中断
*/
boolean interrupted = false;
//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();
//前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
if (p == head && tryAcquire(arg)) {
//若成功获取锁,head要指向当前节点,即head是获取到锁的那个节点或者是null
setHead(node);
/**
* 由于setHead中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null; // help GC
failed = false;
//返回获取锁途中是否被中断过
return interrupted;
}
/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//途中哪怕只被中断一次也要设置中断为true
interrupted = true;
}
} finally {
//失败处理
if (failed)
cancelAcquire(node);
}
}
小问:为神马前驱节点为head时需要再次尝试获取锁呢?
友情小提示:读者可以从前驱节点为head时的锁获取情况去考虑
**1.前驱节点已成功获取锁并正在占用该锁,但可能很快释放**
**2.前继节点是空节点, 此时已经释放锁, 因此后继节点就有机会获取锁了**
1.当获取锁的过程中发生中断,立即抛出中断异常,然后进入finally处理失败
2.同时少了设置中断状态的步骤
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on success.
* Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* 独占式响应中断获取锁
* 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
* 2.成功获取锁的实现是至少一次调用tryAcquire成功并返回true
* 3.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquire成功获取锁或发生中断
* 4.该方法可用于实现Lock接口的lockInterruptibly方法
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
/**
* 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
* 注意静态方法会清除中断标识
*/
if (Thread.interrupted())
throw new InterruptedException();
//若初次调用tryAcquire失败需要封装成独占Node并进入同步队列
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Acquires in exclusive interruptible mode.
*
* 独占式响应中断超时获取锁
*
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//当前线程封装成Node
final Node node = addWaiter(Node.EXCLUSIVE);
//记录获取锁是否失败
boolean failed = true;
try {
//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();
//前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
if (p == head && tryAcquire(arg)) {
//若成功获取锁,head要指向当前节点,即head是获取到锁的那个节点或者是null
setHead(node);
/**
* 由于setHead中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null; // help GC
failed = false;
return;
}
/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
* 一旦发生中断,立即抛出异常,停止自旋,然后进入finally处理失败
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//失败处理(如线程中断)
if (failed)
cancelAcquire(node);
}
}
小问: 若 if (p == head && tryAcquire(arg)) { //恰好在执行到该方法内部时发生中断 }
,此时会如何处理中断?
友情小提示:读者可以从线程状态角度去考虑
小答:此时该中断会被忽略,原因是该线程目前是运行态,而运行态是不响应中断的
友情推荐:关于中断响应机制读者可参看笔者的 并发番@Thread一文通(1.7版)
1.若获取锁的过程中超过超时阈值,会先执行超时阻塞;否则会先再次自旋
2.一旦超时立即返回fasle,然后进入finally处理失败
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquire}, returning on success. Otherwise, the thread is
* queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
*
* 独占式响应中断超时获取锁
* 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
* 2.若获取锁超时会立即返回,即获取锁失败
* 3.成功获取锁的实现是至少一次调用tryAcquire成功并返回true
* 4.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquire成功获取锁
* 或发生中断或超时
* 5.该方法可用于实现Lock接口的tryLock(long, TimeUnit)方法
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
/**
* 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
* 注意静态方法会清除中断标识
*/
if (Thread.interrupted())
throw new InterruptedException();
/**
* 若初次调用tryAcquire失败需要封装成独占Node并进入同步队列
* 由于超时特性,会返回布尔值告知外部是否成功获取锁或获取锁是否超时
*/
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* Acquires in exclusive timed mode.
*
* 独占式响应中断超时获取锁
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//超时时间必须有效
if (nanosTimeout <= 0L)
return false;
//超时截止时间
final long deadline = System.nanoTime() + nanosTimeout;
//当前线程封装成独占模式Node
final Node node = addWaiter(Node.EXCLUSIVE);
//记录获取锁是否失败
boolean failed = true;
try {
//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();
//前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
if (p == head && tryAcquire(arg)) {
//若成功获取锁,head要指向当前节点,即head是获取到锁的那个节点或者是null
setHead(node);
/**
* 由于setHead中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null; // help GC
failed = false;
//成功获取锁需要返回成功标识,通知外部调用成功
return true;
}
//剩余时间
nanosTimeout = deadline - System.nanoTime();
//超时立即返回false,同时会在finally中执行失败处理
if (nanosTimeout <= 0L)
return false;
/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
* 若同时剩余时间超过自旋时间阈值(默认1000L)即超时阻塞
*/
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//若线程在获取锁过程中被中断,立即抛出异常,同时会在finally中执行失败处理
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
//失败处理(如线程中断或超时)
if (failed)
cancelAcquire(node);
}
}
小问:为神马要设置超时阈值?
友情小提示:读者可能从锁优化角度考虑
小答:在剩余时间(nanosTimeout)小于超时阈值(spinForTimeoutThreshold)时,自旋的效率比LockSupport.park更高且开销更少
1.调用子类的tryRelease释放锁资源,若有重复锁需要完全释放
2.当head的waitStatus状态非0,意味着同步队列为空,需要尝试唤醒同步队列中的下一个等待唤醒的线程
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* 独占模式释放锁,成功释放依据是tryRelease返回true
* 该方法被用来实现Lock.unlock方法
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
//调用tryRelease判断是否已成功完全释放锁 --即Status是否被CAS更新为0
if (tryRelease(arg)) {
Node h = head;
/**
* 当head的waitStatus状态非0,意味着同步队列为空
* 需要尝试唤醒同步队列中的下一个等待唤醒的线程
*/
if (h != null && h.waitStatus != 0)
//唤醒同步队列中的下一个等待唤醒的线程
unparkSuccessor(h);
return true;
}
return false;
}
分情况讨论:
1.队列为空:直接返回,不走unpark
2.队列非空:
2-1 head刚初始化此时waitstatus = 0,且此时新节点尚未入队(enq尚未结束) ,不走unpark
2-2 新节点入队完成(enq结束),(进入aquireQueue)但第一次自旋时就拿到锁,此时head的waitstatus = 0 且setHead之后仍为0,不走unpark
2-3 新节点入队完成,但第一次自旋后拿不到锁,则自身的会waitstaus变更为-1(shouldParkAfterFailedAcquire);若在调用should方法之前触发release,由于在should中cas为-1后返回false导致在aquireQueue中继续自旋
2-4 新节点入队完成&&head的waitstatus=-1,但此时重新自旋获取锁仍然失败,若should发生在casStatus之前则直接返回true否则重新cas为-1并自旋, 此时至多消耗一次无用的unpark操作;若should发生在casStatus之后,
1.不响应中断获取锁-acquireShared
2.响应中断获取锁-acquireSharedInterruptibly
3.响应中断和超时获取锁-tryAcquireSharedNanos
1.子类需要实现共享方式获取锁tryAcquireShared
2.节点出队方式变更,当获取锁成功时前驱节点出队时会传播唤醒操作
1.tryAcquireShared: 先调用子类自实现的tryAcquireShared方法获取锁,成功即获得锁,否则进入第二步
2.doAcquireShared: 当前线程封装为共享Node并进入同步队列,自旋+共享方式获取锁
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* 共享模式获取锁,不响应中断
* 1.成功获取锁的实现是至少一次调用tryAcquireShared成功并返回非负数
* 2.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquireShared成功获取锁
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
/**
* 与独占模式的核心区别之一:
* 通过调用子类实现的tryAcquireShared获取锁
* tryAcquireShared < 0 说明没有获取到锁 ,因此需要确定好State的资源量
*/
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
*
* 共享模式获取锁,不响应中断
*
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
//当前线程封装为共享Node并进入同步队列,等待前驱节点的SIGNAL状态
final Node node = addWaiter(Node.SHARED);
//记录获取锁是否失败
boolean failed = true;
try {
/**
* 记录获取锁过程中是否被中断
* 注意提供中断响应的方法没有该变量,而是选择直接抛出中断异常
*/
boolean interrupted = false;
//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//前驱节点是head时需要再次尝试tryAcquireShared --成功获取锁的关键
int r = tryAcquireShared(arg);
if (r >= 0) {
/**
* 与独占模式的核心区别之二:
* 当获取锁成功时前驱节点出队,区别于独占模式,共享模式会往
* 后传播唤醒操作,目的是保证还在等待的线程能够尽快获取到锁
*/
setHeadAndPropagate(node, r);
/**
* 由于setHeadAndPropagate中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null;
/**
* 与独占模式的核心区别之三:
* 获取锁途中一旦被中断,直接设置中断标识为true
*/
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//失败处理
if (failed)
cancelAcquire(node);
}
}
小问:为神马共享式不响应中断获取锁可以直接设置中断,而独占式却是返回中断状态(核心区别之三)?
友情小提示:读者可以用selfInterrupt()方法的复用情况方面考虑
小答:这涉及到条件队列的部分内容,条件队列只能用于独占模式(因为使用条件队列的前提就是先获取到锁 -- 管程要求),而在条件队列的多个方法中会根据判断条件决定是否要执行selfInterrupt()方法,因此在共享模式中可以直接中断,而独占需要返回中断状态告知独占或条件队列是否执行selfInterrupt()方法
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* 共享式响应中断获取锁
* 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
* 2.成功获取锁的实现是至少一次调用tryAcquireShared成功并返回非负数
* 3.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquireShared成功获取锁或发生中断
* 4.该方法可用于实现Lock接口的lockInterruptibly方法
* @param arg the acquire argument.This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
/**
* 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
* 注意静态方法会清除中断标识
*/
if (Thread.interrupted())
throw new InterruptedException();
/**
* 与独占模式的核心区别之一:
* 通过调用子类实现的tryAcquireShared获取锁
* tryAcquireShared < 0 说明没有获取到锁 ,因此需要确定好State的资源量
*/
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
//当前线程封装成共享Node
final Node node = addWaiter(Node.SHARED);
//记录获取锁是否失败
boolean failed = true;
try {
//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//前驱节点是head时需要再次尝试tryAcquire --成功获取锁的关键
int r = tryAcquireShared(arg);
if (r >= 0) {
/**
* 与独占模式的核心区别之二:
* 当获取锁成功时前驱节点出队,区别于独占模式,共享模式会往
* 后传播唤醒操作,目的是保证还在等待的线程能够尽快获取到锁
*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
* 一旦发生中断,立即抛出异常,停止自旋,然后进入finally处理失败
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//失败处理(如线程中断)
if (failed)
cancelAcquire(node);
}
}
/**
* Attempts to acquire in shared mode, aborting if interrupted, and
* failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquireShared}, returning on success. Otherwise, the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted or the timeout elapses.
*
* 共享式响应中断超时获取锁
* 1.期间发生中断会立即抛中断异常停止获取锁,即获取锁失败
* 2.若获取锁超时会立即返回,即获取锁失败
* 3.成功获取锁的实现是至少一次调用tryAcquireShared成功并返回非负数
* 4.否则线程将进入同步队列,期间可能多次阻塞和解除阻塞,直到调用tryAcquireShared成功获取锁
* 或发生中断或超时
* 5.该方法可用于实现Lock接口的tryLock(long, TimeUnit)方法
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{
/**
* 获取前若中断,就二话不说直接抛出异常快速失败,还获取什么锁呀,浪费资源
* 注意静态方法会清除中断标识
*/
if (Thread.interrupted())
throw new InterruptedException();
/**
* 若初次调用tryAcquireShared失败需要封装成独占Node并进入同步队列
* 由于超时特性,会返回布尔值告知外部是否成功获取锁或获取锁是否超时
*/
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
/**
* Acquires in shared timed mode.
*
* 共享式响应中断超时获取锁
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
//超时时间必须有效
if (nanosTimeout <= 0L)
return false;
//超时截止时间
final long deadline = System.nanoTime() + nanosTimeout;
//当前线程封装成共享模式Node
final Node node = addWaiter(Node.SHARED);
//记录获取锁是否失败
boolean failed = true;
try {
//自旋-无限循环尝试获取锁,期间可能多次阻塞和解除阻塞,主要是等待前驱节点释放锁或被中断
for (;;) {
final Node p = node.predecessor();
//注意传播也是从head开始一个个唤醒的,不是一次性的(只不过快到好比一次性)
if (p == head) {
//前驱节点是head时需要再次尝试tryAcquireShared --成功获取锁的关键
int r = tryAcquireShared(arg);
if (r >= 0) {
/**
* 与独占模式的核心区别之二:
* 当获取锁成功时前驱节点出队,区别于独占模式,共享模式会往
* 后传播唤醒操作,目的是保证还在等待的线程能够尽快获取到锁
*/
setHeadAndPropagate(node, r);
/**
* 由于setHead中node.prev=null,这里将p.next = null
* 就意味着之前已获取到锁的节点已经出队,可以安心回收啦
*/
p.next = null; // help GC
failed = false;
//成功获取锁需要返回成功标识,通知外部调用成功
return true;
}
}
//剩余时间
nanosTimeout = head || rline - System.nanoTime();
//超时立即返回false,同时会在finally中执行失败处理
if (nanosTimeout <= 0L)
return false;
/**
* 看看能不能安心park,不能的话再来一趟自旋,不怕累
* 若能安心park,则进入等待状态,直到被unpark唤醒
* 唤醒后会再来一次循环,因此因为自旋的存在,可能存在多次park和unpark
* 若同时剩余时间超过自旋时间阈值(默认1000L)即超时阻塞
*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckI
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//若线程在获取锁过程中被中断,立即抛出异常,同时会在finally中执行失败处理
if (Thread.interrupted())
throw new InterruptedException();
}
}
} finally {
//失败处理(如线程中断或超时)
if (failed)
cancelAcquire(node);
}
}
当同步队列中存在多个共享节点且资源够数时可能会并发地唤醒后继节点,因为共享模式下获取锁后就近唤醒后继节点
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* 共享模式释放锁
* 实现原理是tryReleaseShared返回true时解除一个或多个线程的阻塞
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
//自旋方式释放锁
for (;;) {
Node h = head;
//队列不为空且有后继节点
if (h != null && h != tail) {
int ws = h.waitStatus;
//唤醒前提都是Head节点状态为SIGNAL
if (ws == Node.SIGNAL) {
//只有当CAS回归纯真成功时才选择去唤醒后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; //(1) loop to recheck cases
unparkSuccessor(h);
//之所以要设置PROPAGATE主要是区别于独占模式的0,以告知到时要用传播方式进行后续处理
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; //(2) loop on failed CAS
}
//如果head没变化,说明没有唤醒的线程啦,功成名就
if (h == head) //(3) loop if head changed
break;
//其他情况,比如head节点无变化、ws为取消状态等,直接跳过不理会就好
}
}
小问:共享模式为何采用自旋方式释放锁?
友情小提示:遇到CAS操作,我们都需要考虑三个点:1.为何此处用CAS 2.若CAS成功呢? 3.若CAS失败呢?
补充小提示:CAS之所以会失败,通常都是发生了竞争,当然也可能是异常、中断等,因此失败原因很重要
1.前驱节点状态为SIGNAL: 一定会唤醒后继节点,直接返回true,说明可以安心park
2.前驱节点状态为CANCELLED: 需要反向找到一个非取消状态节点作为新前驱节点,返回false,此时不允许park
3.前驱节点为0或PROPAGATE(共享): 由于前驱节点和当前节点都存在,因此存在前驱节点需要唤醒后继节点的必要性,因此前驱节点需要被设置为SIGNAL状态
/**
* 当节点没有获取到锁时,通过检查和更新waitStatus,
* 以判断线程是否可以安心park进入等待状态
*
* @param pred 前驱节点
* @param node the node 当前节点
* @return {@code true} 线程应被阻塞时返回true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/**
* 线程能否安心park进入等待状态取决于前驱节点的waiStatus值
* 换句话说,前驱节点的waiStatus值决定了当前节点的'命'!
*/
int ws = pred.waitStatus;
//1.waitStatus == Node.SIGNAL 待唤醒状态
if (ws == Node.SIGNAL)
/**
* 只有前驱节点为SIGNAL状态,当前线程才能安心被park
* 否则前驱节点在获取锁之后是不会通知当前线程的,
* 当前线程一脸懵逼的哭死在无限等待中?!
*/
return true;
//2.waitStatus >0 即 waitStatus == Node.CANCELLED == 1 取消状态
//取消状态时需要重选一个非取消状态的前驱节点
if (ws > 0) {
/**
* 沿prev反向遍历直到找到一个非取消状态的前驱节点
*
* 小问:那么中间那些取消状态的节点怎么办??
* 小答:笑话,GC老爷子怎么会忘记回收在队列中
* 这些'在其位不谋其政'(放弃获取锁但仍在队列中)的'垃圾们'呢 -
* 正经说法:由于前塞动作,已放弃的节点相当于形成一条无引用链,不久就会被GC
*/
do {
//前塞动作 已放弃获取锁的取消状态节点会形成一条无引用链
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//注意不要忘记将新的前驱节点的next指向当前节点(相爱相杀不能忘)
pred.next = node;
} else {
//3.waitStatus == Node.PROPAGATE || 0
/**
* 为了后继节点能够安心park,需要将前驱节点设置为SIGNAL状态
* 这样当前驱节点拿到锁了就可以通知后驱节点(即当前线程):后面那个,差不多该醒啦!
* 注意:该方法不存在Node.CONDITION的情况
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//前驱节点非Node.SIGNAL都不应park
return false;
}
小问:为神马compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
后仍要返回false,而不是直接返回true?
友情小提示:读者可以从自旋意义这一方面入手考虑
ws==Node.SIGNAL
我们就可以一窥一二,理由有三:ws==Node.SIGNAL
,我们可以确保在安全环境下快速返回,提高效率核心重点:该方法根本作用就是要确保成功地设置前驱节点的SIGNAL状态,以确保前驱节点释放锁后一定能唤醒被park阻塞的后继节点
注意前提:只有在前驱节点为SIGNAL状态下才能park阻塞当前节点
/**
* 线程park进入等待状态,并返回是否被中断唤醒
* @return {@code true} if interrupted 若被中断唤醒返回true,否则false
*/
private final boolean parkAndCheckInterrupt() {
//PS:park后线程进入等待状态,线程阻塞在此处,不会继续往后执行,即不会执行到步骤2
LockSupport.park(this);//步骤1
/**
* 需要注意的是静态方法会清除中断标志
* 想要中断任务的话需要在外部再次设置中断
*
* PS:线程只有被unpark唤醒后才会接着继续执行步骤2
*/
return Thread.interrupted();//步骤2
}
- 关于中断机制读者可参看笔者的 并发番@Thread一文通(1.7版) 6.线程中断
- 关于park读者可参看笔者的 并发番@LockSupport一文通(1.8版)
/**
* 补中断:设置线程中断标志为true
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
- 1.状态位变化:初始状态0 -> Node.SIGNAL(作为唤醒后继节点的依据) -> 0 (唤醒后继节点后回归纯真)
- 2.若当前节点的后继节点不存在或是取消状态,需要反向遍历去找最靠近当前节点的非取消状态后继节点
- 3.若存在非取消状态的后继节点就唤醒他,当然找不到就算了(说明没有需要获取锁的节点存在啦)
/**
* 唤醒当前节点的后继节点
* @param node the node 当前节点
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//重置当前节点标志位
if (ws < 0)
/**
* 若是负数就表示未取消,需要回归纯真状态-> 0
* 允许失败,因为只有取消状态的节点才会被回收
*/
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally just the next node.
* But if cancelled or apparently null,traverse backwards from tail to find
* the actual non-cancelled successor.
*
* 通常node.next就是要被唤醒的后继节点
* 但若next节点被取消或为空,需要反向遍历去找最靠前的非取消状态节点并唤醒她
* waitStatus <= 0 说明未取消,就是节点还想获取锁的意思
* 共享模式时会发生s == null的情况
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//换句话说,当t == null || t == node时停止循环
for (Node t = tail; t != null && t != node; t = t.prev)
//找到同步队列中最靠前(即最靠近node)的想要获取锁的节点
if (t.waitStatus <= 0)
/**
* 小问:为啥是最靠前呢?
*
* 小答:因为在这里会不断把非取消状态的节点赋值给s
* 直到是最靠近node(即最靠前)的时候才停止
* 而不是一旦找到一个非取消状态的节点就停止循环啦!
*/
s = t;
}
/**
* 如果后继节点非空,那就让我们唤醒她吧!
*/
if (s != null)
LockSupport.unpark(s.thread);
}
小问:为神马要沿着prev从tail往前获取非取消节点呢?
友情小提示:读者可以从入队、出队、取消操作入手考虑prev和next的线程安全问题
1.反向遍历:prev赋值线程安全,next赋值非线程安全且在CLH队列不同形态会有不同的表现,判断逻辑会比prev复杂很多
2.跳过取消节点:取消状态节点都是需要被GC回收的节点,因为其放弃了获取锁的机会,正所谓强扭的瓜不甜呀!
小问:有心的读者会发现在unparkSuccessor
方法中其实并没有操作队列,那么线程节点是何时出队的呢?
友情小提示:读者可以回顾一下获取锁的整个过程重新梳理一下
小答:unparkSuccessor
方法主要做了个唤醒操作,真正的出队操作(比如调整prev、next)是在setHead
、shouldParkAfterFailedAcquire
、cancelAcquire
等方法中实际完成的
并发番@AbstractQueuedSynchronizer一文通 由 黄志鹏kira 创作,采用 知识共享 署名-非商业性使用 4.0 国际 许可协议 进行许可。
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。