0%

CyclicBarrier源码学习

前言:JUC包,CyclicBarrier源码学习

简介

Cyclic 循环 Barrier 屏障,CyclicBarrier和CountDownLatch很相似,但CyclicBarrier功能更强,CyclicBarrier能设置一个屏障,让一组线程到达屏障时被阻塞,直到最后一个线程到达屏障时,屏障被破坏,所有被屏障拦截的线程继续执行,

常用方法

await:到达屏障并等待,当所有线程都到达时,再继续执行。注意一般情况下,如果有10个线程,屏障为5,会5个线程同时执行,然后剩下5个再同时执行的情况
,因为当屏障满足到达线程数目时,屏障破碎并进入下一代重新恢复屏障

简单实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* @author shency
* @description: TODO
* @date: 2019/12/3
*/
public class CyclicBarrierTest {
private static final int THREADCOUNT = 10;
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(THREADCOUNT, THREADCOUNT, 30,
TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < THREADCOUNT; i++) {
threadPool.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + "is ready");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "isFinish");
} catch (Exception e) {
e.printStackTrace();
}

});
}
threadPool.shutdown();
}
}

源码

image-20200105210200575

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 独占锁对象
private final ReentrantLock lock = new ReentrantLock();

// Condition队列
private final Condition trip = lock.newCondition();

// 表示需要有parties个线程到达屏障,屏障破坏
private final int parties;

// 执行线程
private final Runnable barrierCommand;

// 代数,generation对象有一个broken属性,broken==true表示当前屏障被破坏
private Generation generation = new Generation();

// 计数器,表示目前还需要count个线程屏障才会破坏
private int count;

构造函数

CyclicBarrier提供了两个构造函数,必须制定parties且大于0

1
2
3
public CyclicBarrier(int parties) {
this(parties, null);
}
1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

Generation

静态内部类

1
2
3
private static class Generation {
boolean broken = false;
}

await

await由调用dowait方法实现的,两个参数分别代表是否等待和等待的时长。

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

限时await

1
2
3
4
5
6
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

breakBarrier

破坏屏障,dowait、reset中被调用

1
2
3
4
5
6
private void breakBarrier() {
generation.broken = true;
count = parties;
// 唤醒Condition队列中所有线程
trip.signalAll();
}

dowait

dowait是CyclicBarrier类的重点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 当屏障被破坏并且没有进入下一代,新的线程await,抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 响应线程中断,抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 有一个新的线程到达屏障,count--
int index = --count;
// count==0,表示所有线程均已到达屏障
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果设置了command,则屏障被破坏时会执行它
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 开启下一代
nextGeneration();
return 0;
} finally {
// command.run()出现问题,则执行breakBarrier
if (!ranAction)
breakBarrier();
}
}

// 循环,直到线程被唤醒或中断或异常
for (;;) {
try {
// 不限时,调用AQS内部类ConditionObject类的await方法,将当前线程堵塞并加入到Condition队列,直到被Signal方法唤醒
if (!timed)
trip.await();
// 限时,调用AQS内部类ConditionObject类的awaitNanos方法,将当前线程堵塞并加入到Condition队列,直到被Signal方法唤醒或者到达指定时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果在同一代并且屏障未被破坏时线程中断,则破坏屏障并抛出异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 进入到这种情况,说明线程中断太晚了,,generation已更新或破坏,只能设置线程中断标识了
Thread.currentThread().interrupt();
}
}
// 屏障被破坏,抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 屏障没有被破坏,检测generation对象,如果是上一代的线程就可以继续运行
if (g != generation)
return index;
// 超时,破坏屏障并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

getNumberWaiting

获取当前到达屏障的线程数即parties - count

1
2
3
4
5
6
7
8
9
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}

getParties

返回parties

1
2
3
public int getParties() {
return parties;
}

isBroken

判断当前屏障是否被破坏

1
2
3
4
5
6
7
8
9
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}

nextGeneration

开启下一代,dowait、reset中被调用。跟breakBarrier方法相比,都会唤醒线程并复位count,区别在于nextGeneration会重建新的generation,屏障被恢复(新的线程await会再次等待),但breakBarrier会将generation.broken设置为true,表示屏障被破坏(新的线程await会抛出异常)

1
2
3
4
5
6
7
8
private void nextGeneration() {
// 唤醒所有线程
trip.signalAll();
// 复位count
count = parties;
// 更新generation对象
generation = new Generation();
}

reset

重置

1
2
3
4
5
6
7
8
9
10
11
12
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 破坏屏障
breakBarrier(); // break the current generation
// 开启下一代
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

总结

  1. CyclicBarrier中的屏障是一种抽象概念,由计数器count和布尔broken表示,当屏障未被破坏时,调用Condition方法堵塞当前线程并将其加入Condition队列
  2. CyclicBarrier使用ReentrantLock来控制dowait、getNumberWaiting、isBroken、reset等方法的执行,以上方法执行都需要获取独占锁,不能同时执行上述方法
  3. CyclicBarrier到指定N个线程到达后,会唤醒所有等待的线程并进入下一代(上一代的线程可以继续执行,下一代的线程会有屏障拦截),使用起来非常简单,只需要会用await这一个方法就行
-------------本文结束感谢您的阅读-------------