[关闭]
@boothsun 2017-07-26T20:00:23.000000Z 字数 3628 阅读 1517

CountDownLatch源码浅析

Java多线程


参考好文:

前言

CountDownLatch用于同步一个或者多个任务,强制他们等待有其他任务执行的一组操作完成。CountDownLatch典型的用法是将一个程序分成n个相互独立的可分解任务,并创建值为n个互相独立的可分解任务,并创建值为n的CountDownLatch。当每一个任务完成时,都会在调用countdown方法将count-1,等待问题被解决的任务调用这个锁的await,将自身拦截等待,直到锁计数为0,才继续往下执行。

前沿知识

CAS

CAS:Compare and Swap 可以翻译成比较并交换。
Excample:
CAS操作包含三个操作数 --- 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值想匹配,那么处理器会自动将该位置值更新为新值;否则,处理器不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。(在CAS的一些特殊情况下将仅返回CAS是否成功,而不提前当前值。)CAS有效地说明了”我认为位置V的值应该为值A,如果等于该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可“。

通常将CAS用于以同步的方式从地址V读取值A,执行多步计算来获得新值B,然后使用CAS将V的值从A改为B。如果V处的值尚未被更改,则CAS操作成功。

线程队列

CountDownLatch是为了实现公平锁,采用了队列的形式,保证先到先执行。

从构造函数CountDownLatch(int count)说起

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }

说明: 构造函数创建了一个Sync对象,而Sync是继承于AQS类。Sync构造函数如下:

  1. Sync(int count) {
  2. setState(count);
  3. }
  4. protected final void setState(int newState) {
  5. state = newState;
  6. }

说明:上面代码中的state就是CountDownLatch的“锁计数器”,每次调用CountDownLatch的countdown()方法都是通过CAS的方式将state减1,直到state为0。

countDown()方法:

  1. public void countDown() {
  2. sync.releaseShared(1);
  3. }
  4. public final boolean releaseShared(int arg) {
  5. if (tryReleaseShared(arg)) {
  6. doReleaseShared();
  7. return true;
  8. }
  9. return false;
  10. }
  11. protected boolean tryReleaseShared(int releases) {
  12. for (;;) {
  13. int c = getState();
  14. if (c == 0)
  15. return false;
  16. int nextc = c-1;
  17. if (compareAndSetState(c, nextc))
  18. return nextc == 0;
  19. }
  20. }

说明: countDown -> releaseShared -> tryReleaseShared的作用是释放占用的锁资源;从for(;;) {}的形式,可以看出其内部是通过自旋的形式,一直尝试释放锁,直到当前锁状态为0或者释放成功。compareAndSetState(c, nextc) 可以看出其是通过CAS的方式保证原子性的释放锁。

  1. private void doReleaseShared() {
  2. for (;;) {
  3. Node h = head;
  4. //头节点不为null且不等于尾节点 即头节点是实际有意义的节点
  5. if (h != null && h != tail) {
  6. int ws = h.waitStatus;
  7. // Node.SIGNAL:waitStatus value to indicate successor's thread needs unparking
  8. // 即:当头节点的状态为-1时,表示需要唤醒后面的进程。
  9. if (ws == Node.SIGNAL) {
  10. // 通过CAS的形式对head的状态进行更新,更新失败就continue,直到成功。
  11. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  12. continue; // loop to recheck cases
  13. unparkSuccessor(h); // 唤醒后续的进程
  14. }
  15. else if (ws == 0 &&
  16. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  17. continue; // loop on failed CAS
  18. }
  19. if (h == head) // loop if head changed
  20. break;
  21. }
  22. }

说明: doReleaseShared的作用主要是在释放共享锁成功后,通知后面的节点。

await()方法

如果这里多个线程wait且当前共享锁还不为0则这些wait线程的状态如下图:

  1. public void await() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
  4. public final void acquireSharedInterruptibly(int arg)
  5. throws InterruptedException {
  6. if (Thread.interrupted()) // 判断线程的状态,如果线程已经状态 就抛出异常
  7. throw new InterruptedException();
  8. if (tryAcquireShared(arg) < 0)
  9. doAcquireSharedInterruptibly(arg);
  10. }
  11. protected int tryAcquireShared(int acquires) {
  12. return (getState() == 0) ? 1 : -1;
  13. }

说明: getState()返回的是当前CountDownLatch对象剩余的共享锁数量。从tryAcquireShared方法的返回值可以看出:当共享锁的数量不为0时,执行doAcquireSharedInterruptibly(arg),即当前调用await的线程需要加入到等待共享锁释放的队列而不是可以立即往下执行时,进入到doAcquireSharedInterruptibly(arg)方法进行等待。

  1. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  2. //由于采用了公平锁,所以要将节点放到队列里,保证先到先执行。
  3. final Node node = addWaiter(Node.SHARED);
  4. boolean failed = true;
  5. try {
  6. for (;;) { //开启自旋模式 一直等待,直到锁被全部释放。
  7. final Node p = node.predecessor();//获取当前节点的前继节点(node.prev)
  8. // 如果当前节点的前继节点(node.prev)等于head 头节点,则表示当前节点是等待队列中
  9. // 的第一个节点。满足 先到先执行的 公平顺序,则尝试释放锁。
  10. if (p == head) {
  11. //再次获取 当前剩余的共享锁数量是否为0,如果为0,则表示可以进行后续的唤醒动作
  12. int r = tryAcquireShared(arg); // (getState() == 0) ? 1 : -1;
  13. if (r >= 0) { // 满足唤醒的条件 开始进行唤醒动作
  14. // 将当前的node设置为head,也就是模拟当前node出队列动作
  15. // 同时唤醒当前出队列的node节点。
  16. setHeadAndPropagate(node, r);
  17. p.next = null; // help GC
  18. failed = false;
  19. return;
  20. }
  21. }
  22. //校验线程是否被打断,如果被打断则抛出异常
  23. if (shouldParkAfterFailedAcquire(p, node) &&
  24. parkAndCheckInterrupt())
  25. throw new InterruptedException();
  26. }
  27. } finally {
  28. if (failed)
  29. cancelAcquire(node);
  30. }
  31. }

说明:
1. 当有多个线程执行await()方法,且共享锁的数量尚未等于0的情况下,等待队列如下图(0先到 1、2、3依次)

2. doAcquireSharedInterruptibly方法的执行逻辑如下图,特别要说明的是Thread0 Thread1 Thread2 Thread3内部都并行在执行下图逻辑,不断校验自己的前驱节点是否为head,自身是否为中断。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注