前言:JUC包,CyclicBarrier源码学习
简介
Cyclic 循环 Barrier 屏障,CyclicBarrier和CountDownLatch很相似,但CyclicBarrier功能更强,CyclicBarrier能设置一个屏障,让一组线程到达屏障时被阻塞,直到最后一个线程到达屏障时,屏障被破坏,所有被屏障拦截的线程继续执行,
常用方法
await:到达屏障并等待,当所有线程都到达时,再继续执行。注意一般情况下,如果有10个线程,屏障为5,会5个线程同时执行,然后剩下5个再同时执行的情况
,因为当屏障满足到达线程数目时,屏障破碎并进入下一代重新恢复屏障
简单实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public class CyclicBarrierTest { private static final int THREADCOUNT = 10; private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor(THREADCOUNT, THREADCOUNT, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < THREADCOUNT; i++) { threadPool.execute(() -> { try { System.out.println(Thread.currentThread().getName() + "is ready"); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "isFinish"); } catch (Exception e) { e.printStackTrace(); }
}); } threadPool.shutdown(); } }
|
源码
属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
|
构造函数
CyclicBarrier提供了两个构造函数,必须制定parties且大于0
1 2 3
| public CyclicBarrier(int parties) { this(parties, null); }
|
1 2 3 4 5 6
| public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
|
Generation
静态内部类
1 2 3
| private static class Generation { boolean broken = false; }
|
await
await由调用dowait方法实现的,两个参数分别代表是否等待和等待的时长。
1 2 3 4 5 6 7
| public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } }
|
限时await
1 2 3 4 5 6
| public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
|
breakBarrier
破坏屏障,dowait、reset中被调用
1 2 3 4 5 6
| private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
|
dowait
dowait是CyclicBarrier类的重点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } }
for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
|
getNumberWaiting
获取当前到达屏障的线程数即parties - count
1 2 3 4 5 6 7 8 9
| public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
|
getParties
返回parties
1 2 3
| public int getParties() { return parties; }
|
isBroken
判断当前屏障是否被破坏
1 2 3 4 5 6 7 8 9
| public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
|
nextGeneration
开启下一代,dowait、reset中被调用。跟breakBarrier方法相比,都会唤醒线程并复位count,区别在于nextGeneration会重建新的generation,屏障被恢复(新的线程await会再次等待),但breakBarrier会将generation.broken设置为true,表示屏障被破坏(新的线程await会抛出异常)
1 2 3 4 5 6 7 8
| private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
|
reset
重置
1 2 3 4 5 6 7 8 9 10 11 12
| public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); nextGeneration(); } finally { lock.unlock(); } }
|
总结
- CyclicBarrier中的屏障是一种抽象概念,由计数器count和布尔broken表示,当屏障未被破坏时,调用Condition方法堵塞当前线程并将其加入Condition队列
- CyclicBarrier使用ReentrantLock来控制dowait、getNumberWaiting、isBroken、reset等方法的执行,以上方法执行都需要获取独占锁,不能同时执行上述方法
- CyclicBarrier到指定N个线程到达后,会唤醒所有等待的线程并进入下一代(上一代的线程可以继续执行,下一代的线程会有屏障拦截),使用起来非常简单,只需要会用await这一个方法就行