0%

CountDownLatch源码学习

前言:JUC包,CountDownLatch源码学习

简介

count down 倒计时,latch 门闩。CountDownLatch是一个同步类工具,用于线程通信

CountDownLatch能够使一个线程在等待其他线程完成之后,再执行。使用一个计数器进行实现,计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

使用场景

  1. 某一线程等N个线程后执行
  2. N个线程在某个操作后并行执行

常用方法

countDown:计数-1
await:线程堵塞直到计数为0

子进程例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @author shency
* @description: TODO
* @date: 2019/12/2
*/
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch ready = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
/**
* 省略子线程执行内容
*/
System.out.println(Thread.currentThread().getName() + " 子线程执行结束");
ready.countDown();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
ready.await();
System.out.println("所有子线程结束,Main执行后续方法");
/**
* 省略父线程执行内容
*/
executorService.shutdown();
}
}

运动员例子

10个运动员,都准备好了后,裁判开发令枪,运动元同时开跑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @author shency
* @description: TODO
* @date: 2019/12/2
*/
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch startGun = new CountDownLatch(1);
CountDownLatch ready = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
ready.countDown();
System.out.println(Thread.currentThread().getName() + " 准备好了");
startGun.await();
System.out.println(Thread.currentThread().getName() + " 开跑");
Thread.sleep((long) (Math.random() * 100));
System.out.println(Thread.currentThread().getName() + " 到达终点");
} catch (InterruptedException e) {
e.printStackTrace();
}

}
});
}
ready.await();
System.out.println("所有运动员都准备好了,裁判开发令枪");
startGun.countDown();
executorService.shutdown();
}
}

源码

image-20200105215213113

Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 设置state
Sync(int count) {
setState(count);
}
// 返回state
int getCount() {
return getState();
}
// 尝试获取资源,返回1代表可以获取资源,返回-1代表失败
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 尝试释放资源
protected boolean tryReleaseShared(int releases) {
// CAS将state-1
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

CountDownLatch

构造函数,初始化state

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

await

调用AQS acquireSharedInterruptibly方法获取资源,根据上面Sync的tryAcquireShared方法,当state==0时才能成功获取资源,否则就失败,进入AQS CLH队列,响应中断

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

限时await

1
2
3
4
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

countDown

释放1个资源

1
2
3
public void countDown() {
sync.releaseShared(1);
}

getCount

返回state

1
2
3
public long getCount() {
return sync.getCount();
}

toString

转换为String格式

1
2
3
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}

总结

通过上述源码可知,CountDownLatch是一个共享锁,初始化锁住所有共享资源,每次countDown释放一个资源,当所有资源均被释放,被await的线程就能运行。

但是也能看出,CountDownLatch是一次性的,不能重复使用,计数为0就无法被重置。

-------------本文结束感谢您的阅读-------------