@boothsun
2017-07-31T05:27:22.000000Z
字数 4324
阅读 1431
Java多线程
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。CyclicBarrier的作用是让一组线程之间相互等待,任何一个线程到达屏障点后就阻塞,直到最后一个线程到达,才都继续往下执行。个人理解:CyclicBarrier可以看成是一道大门或者关卡,先到的线程会被阻塞在大门口,直到最后一个线程到达屏障时,大门才被打开,所有被阻塞的线程才会继续干活。就像是朋友聚餐,只有最后一个朋友到达时,才会开吃!
CyclicBarrier 的构造函数可以传入一个整数,其含义是屏障可拦截的线程数,每个线程都可调用await方法告诉CyclicBarrier“我已经到达了屏障前”,CyclicBarrier内部是执行+1操作,一旦屏障前阻塞的线程数等于构造函数传入的可拦截线程数时,所有被阻塞的线程都将被唤醒,继续往下执行。
//parties表示屏障前可阻塞的线程数,当阻塞的线程数到达parties时,屏障被打开,所有阻塞的线程将会被唤醒public CyclicBarrier(int parties);// 此构造方法不同于上面的是在屏障被打开时将优先执行barrierAction,方便处理更负责的业务场景public CyclicBarrier(int parties, Runnable barrierAction) ;// 等待屏障的打开public int await() throws InterruptedException,BrokenBarrierException ;//等待屏障的打开 超时会抛出 TimeoutExceptionpublic int await(long timeout, TimeUnit unit) throwsInterruptedException,BrokenBarrierException,TimeoutException ;// 返回在屏障前等待的线程数public int getNumberWaiting() ;// 获取 当前屏障初始化时 可阻塞的线程数public int getParties() ;/*** 作用:* 1. 查询此屏障是否处于损坏状态。* 产生损坏状态的原因:* 1. 由于超时或者屏障重置(reset)* 2. 某个屏障操作抛出异常*/public boolean isBroken() ;<div class="md-section-divider"></div>
模拟 聚餐场景
public class CyclicBarrierTest {public static void main(String[] args) throws Exception{ExecutorService service = Executors.newFixedThreadPool(3) ;CyclicBarrier cyclicBarrier = new CyclicBarrier(3,()->{System.out.println("全都到了 【开吃!】");}) ;for(int i = 0 ; i < 3 ; i++) {final int number = i ;service.execute(()->{try {System.out.println("编号:" + number + "开始出发 【去聚餐】");Thread.sleep((int)(Math.random() * 10000));System.out.println("编号:" + number + " 【到达聚餐地点】");cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();}});}service.shutdown();}}<div class="md-section-divider"></div>
循环使用指的是在大门被打开后,可以再次关闭;即再让之前指定数目的线程在屏障前阻塞等待,然后再次打开大门。
方法reset()的作用就是重置屏障,以保证循环使用。
// 将屏障重置为其初始化状态即重置为构造函数传入的parties值。public void reset()<div class="md-section-divider"></div>
import java.util.concurrent.CyclicBarrier;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;/*** Created by 58 on 2017-7-14.*/public class CyclicBarrierTest {public static void main(String[] args) throws Exception{ThreadPoolExecutor service = (ThreadPoolExecutor)Executors.newFixedThreadPool(3) ;CyclicBarrier cyclicBarrier = new CyclicBarrier(3,()->{System.out.println("全都到了 【开吃!】");}) ;for(int i = 0 ; i < 3 ; i++) {final int number = i ;service.execute(()->{try {System.out.println("编号:" + number + "开始出发 【去聚餐】");Thread.sleep((number+2) * 1000);System.out.println("编号:" + number + " 【到达聚餐地点】");cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();}});}Thread.sleep( 3 * 1000);System.out.println(cyclicBarrier.getNumberWaiting());cyclicBarrier.reset();System.out.println(cyclicBarrier.getNumberWaiting());}}
运行截图

关于reset方法的说明 :
/**
* Resets the barrier to its initial state. If any parties are
* currently waiting at the barrier, they will return with a
* {@link BrokenBarrierException}. Note that resets after
* a breakage has occurred for other reasons can be complicated to
* carry out; threads need to re-synchronize in some other way,
* and choose one to perform the reset. It may be preferable to
* instead create a new barrier for subsequent use.
*/
上面是JDK中reset方法的注释,简单翻译成中文如下:
将屏障重置为其初始状态。此时,如果有任何的一个参与者正在屏障前等待,它将会返回一个 BrokenBarrierException异常。注意:如果因为其他原因使屏障发生损坏,此时屏障的重置将会变得很复杂;为了将来的使用,相比需要考虑使用其他方式重新同步线程,并选择其中一个线程来执行重置,更好的解决办法是创建一个新的屏障。
从上面的翻译,我们可以得出结论:
`BrokenBarrierException异常。关于BrokenBarrierException异常的说明:
/**
* Exception thrown when a thread tries to wait upon a barrier that is
* in a broken state, or which enters the broken state while the thread
* is waiting.
*/
上面是JDK中BrokenBarrierException方法的注释,简单翻译成中文如下:
当某个线程试图在一个已经处于损坏状态的屏障前等待或者在线程等待时屏障进入了损坏状态 将会抛出此异常。
CountDownLatch一般用于某个线程A等待若干其他线程执行完任务之后,它才执行。而CyclicBarrier一般用于多个线程之间相互等待,比如需要在一个同时执行时间点上达成一致,然后同时开启一项工作;重点是“多个线程之间” 任何一个线程没有完成任务,则其他所有的线程都必须等待。
CyclicBarrier的reset方法可以重置屏障。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction来汇总这些线程的计算结果,计算出整个Excel的日均银行流水。
下面的注释片段是JDK源码中,关于CyclicBarrier的介绍。
*A {@code CyclicBarrier} supports an optional {@link Runnable} command
* that is run once per barrier point, after the last thread in the party
* arrives, but before any threads are released.
* This barrier action is useful
* for updating shared-state before any of the parties continue.
*
after the last thread in the party arrives , but before any threads are released 这句明确说明barrierAction的执行时机:在最后一个线程到达后,但是在所有线程被释放运行前。