前言:JUC包,CountDownLatch源码学习
简介
count down 倒计时,latch 门闩。CountDownLatch是一个同步类工具,用于线程通信
CountDownLatch能够使一个线程在等待其他线程完成之后,再执行。使用一个计数器进行实现,计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
使用场景
- 某一线程等N个线程后执行
- N个线程在某个操作后并行执行
常用方法
countDown:计数-1
await:线程堵塞直到计数为0
子进程例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(20), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); CountDownLatch ready = new CountDownLatch(10); for (int i = 0; i < 10; i++) { executorService.execute(new Runnable() { @Override public void run() {
System.out.println(Thread.currentThread().getName() + " 子线程执行结束"); ready.countDown(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }); } ready.await(); System.out.println("所有子线程结束,Main执行后续方法");
executorService.shutdown(); } }
|
运动员例子
10个运动员,都准备好了后,裁判开发令枪,运动元同时开跑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(20), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); CountDownLatch startGun = new CountDownLatch(1); CountDownLatch ready = new CountDownLatch(10); for (int i = 0; i < 10; i++) { executorService.execute(new Runnable() { @Override public void run() { try { ready.countDown(); System.out.println(Thread.currentThread().getName() + " 准备好了"); startGun.await(); System.out.println(Thread.currentThread().getName() + " 开跑"); Thread.sleep((long) (Math.random() * 100)); System.out.println(Thread.currentThread().getName() + " 到达终点"); } catch (InterruptedException e) { e.printStackTrace(); }
} }); } ready.await(); System.out.println("所有运动员都准备好了,裁判开发令枪"); startGun.countDown(); executorService.shutdown(); } }
|
源码
Sync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
|
CountDownLatch
构造函数,初始化state
1 2 3 4
| public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
|
await
调用AQS acquireSharedInterruptibly方法获取资源,根据上面Sync的tryAcquireShared方法,当state==0时才能成功获取资源,否则就失败,进入AQS CLH队列,响应中断
1 2 3
| public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
|
限时await
1 2 3 4
| public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
|
countDown
释放1个资源
1 2 3
| public void countDown() { sync.releaseShared(1); }
|
getCount
返回state
1 2 3
| public long getCount() { return sync.getCount(); }
|
toString
转换为String格式
1 2 3
| public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; }
|
总结
通过上述源码可知,CountDownLatch是一个共享锁,初始化锁住所有共享资源,每次countDown释放一个资源,当所有资源均被释放,被await的线程就能运行。
但是也能看出,CountDownLatch是一次性的,不能重复使用,计数为0就无法被重置。