前言:JUC包,CyclicBarrier源码学习
简介
Cyclic 循环 Barrier 屏障,CyclicBarrier和CountDownLatch很相似,但CyclicBarrier功能更强,CyclicBarrier能设置一个屏障,让一组线程到达屏障时被阻塞,直到最后一个线程到达屏障时,屏障被破坏,所有被屏障拦截的线程继续执行,
常用方法
await:到达屏障并等待,当所有线程都到达时,再继续执行。注意一般情况下,如果有10个线程,屏障为5,会5个线程同时执行,然后剩下5个再同时执行的情况
,因为当屏障满足到达线程数目时,屏障破碎并进入下一代重新恢复屏障
简单实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   | 
 
 
 
  public class CyclicBarrierTest {     private static final int THREADCOUNT = 10;     private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
      public static void main(String[] args) {         ExecutorService threadPool = new ThreadPoolExecutor(THREADCOUNT, THREADCOUNT, 30,                 TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());         for (int i = 0; i < THREADCOUNT; i++) {             threadPool.execute(() -> {                 try {                     System.out.println(Thread.currentThread().getName() + "is ready");                     cyclicBarrier.await();                     System.out.println(Thread.currentThread().getName() + "isFinish");                 } catch (Exception e) {                     e.printStackTrace();                 }
              });         }         threadPool.shutdown();     } }
 
  | 
 
源码

属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   |  private final ReentrantLock lock = new ReentrantLock();
 
  private final Condition trip = lock.newCondition();
 
  private final int parties;
 
  private final Runnable barrierCommand;
 
  private Generation generation = new Generation();
 
  private int count;
 
  | 
 
构造函数
CyclicBarrier提供了两个构造函数,必须制定parties且大于0
1 2 3
   | public CyclicBarrier(int parties) {     this(parties, null); }
  | 
 
1 2 3 4 5 6
   | public CyclicBarrier(int parties, Runnable barrierAction) {     if (parties <= 0) throw new IllegalArgumentException();     this.parties = parties;     this.count = parties;     this.barrierCommand = barrierAction; }
  | 
 
Generation
静态内部类
1 2 3
   | private static class Generation {     boolean broken = false; }
  | 
 
await
await由调用dowait方法实现的,两个参数分别代表是否等待和等待的时长。
1 2 3 4 5 6 7
   | public int await() throws InterruptedException, BrokenBarrierException {     try {         return dowait(false, 0L);     } catch (TimeoutException toe) {         throw new Error(toe);      } }
  | 
 
限时await
1 2 3 4 5 6
   | public int await(long timeout, TimeUnit unit)     throws InterruptedException,            BrokenBarrierException,            TimeoutException {     return dowait(true, unit.toNanos(timeout)); }
   | 
 
breakBarrier
破坏屏障,dowait、reset中被调用
1 2 3 4 5 6
   | private void breakBarrier() {     generation.broken = true;     count = parties;          trip.signalAll(); }
  | 
 
dowait
dowait是CyclicBarrier类的重点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
   | private int dowait(boolean timed, long nanos)     throws InterruptedException, BrokenBarrierException,            TimeoutException {     final ReentrantLock lock = this.lock;     lock.lock();     try {         final Generation g = generation;                  if (g.broken)             throw new BrokenBarrierException();                  if (Thread.interrupted()) {             breakBarrier();             throw new InterruptedException();         }                  int index = --count;                  if (index == 0) {               boolean ranAction = false;             try {                                  final Runnable command = barrierCommand;                 if (command != null)                     command.run();                 ranAction = true;                                  nextGeneration();                 return 0;             } finally {                                  if (!ranAction)                     breakBarrier();             }         }
                   for (;;) {             try {                                  if (!timed)                     trip.await();                                  else if (nanos > 0L)                     nanos = trip.awaitNanos(nanos);             } catch (InterruptedException ie) {                                  if (g == generation && ! g.broken) {                     breakBarrier();                     throw ie;                 } else {                                          Thread.currentThread().interrupt();                 }             }                          if (g.broken)                 throw new BrokenBarrierException();                          if (g != generation)                 return index;                          if (timed && nanos <= 0L) {                 breakBarrier();                 throw new TimeoutException();             }         }     } finally {         lock.unlock();     } }
   | 
 
getNumberWaiting
获取当前到达屏障的线程数即parties - count
1 2 3 4 5 6 7 8 9
   | public int getNumberWaiting() {     final ReentrantLock lock = this.lock;     lock.lock();     try {         return parties - count;     } finally {         lock.unlock();     } }
  | 
 
getParties
返回parties
1 2 3
   | public int getParties() {     return parties; }
  | 
 
isBroken
判断当前屏障是否被破坏
1 2 3 4 5 6 7 8 9
   | public boolean isBroken() {     final ReentrantLock lock = this.lock;     lock.lock();     try {         return generation.broken;     } finally {         lock.unlock();     } }
  | 
 
nextGeneration
开启下一代,dowait、reset中被调用。跟breakBarrier方法相比,都会唤醒线程并复位count,区别在于nextGeneration会重建新的generation,屏障被恢复(新的线程await会再次等待),但breakBarrier会将generation.broken设置为true,表示屏障被破坏(新的线程await会抛出异常)
1 2 3 4 5 6 7 8
   | private void nextGeneration() {          trip.signalAll();          count = parties;          generation = new Generation(); }
  | 
 
reset
重置
1 2 3 4 5 6 7 8 9 10 11 12
   | public void reset() {     final ReentrantLock lock = this.lock;     lock.lock();     try {                  breakBarrier();                     nextGeneration();      } finally {         lock.unlock();     } }
  | 
 
总结
- CyclicBarrier中的屏障是一种抽象概念,由计数器count和布尔broken表示,当屏障未被破坏时,调用Condition方法堵塞当前线程并将其加入Condition队列
 
- CyclicBarrier使用ReentrantLock来控制dowait、getNumberWaiting、isBroken、reset等方法的执行,以上方法执行都需要获取独占锁,不能同时执行上述方法
 
- CyclicBarrier到指定N个线程到达后,会唤醒所有等待的线程并进入下一代(上一代的线程可以继续执行,下一代的线程会有屏障拦截),使用起来非常简单,只需要会用await这一个方法就行