juc之CountDownLatch

介绍

  根据文档:允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。
  一般被称为计数器,可以用做流程控制之类的作用。通常用来让一个线程等待其他的完成过后再继续执行。
  通过一个计数器实现,计数器的初始值可以是线程的数量,每当一个线程执行完,计数器的值减一,当计数器的值为0时,表示所有线程都执行完毕,等待的线程就可以继续执行了。


使用模式

  一般来说有两种使用模式:

  1. 作为启动信号,让多个线程等待
  2. 作为结束信号,让单个线程等待

作为启动信号,让多个线程等待

  实现多个线程开始执行任务的最大并行性,强调的是多个线程在某一时刻同时开始执行。
  做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
  将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testone() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
//准备完毕……运动员都阻塞在这,等待号令
countDownLatch.await();
String parter = "【" + Thread.currentThread().getName() + ",时间:"+ System.currentTimeMillis() + "】";
System.out.println(parter + "开始执行……");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

Thread.sleep(2000);
System.out.println("发令开始.....");
countDownLatch.countDown();
}

  运行结果:

1
2
3
4
5
6
发令开始.....
【Thread-0,时间:1623897582895】开始执行……
【Thread-4,时间:1623897582895】开始执行……
【Thread-3,时间:1623897582895】开始执行……
【Thread-2,时间:1623897582895】开始执行……
【Thread-1,时间:1623897582895】开始执行……

作为结束信号,让一个线程等待

  某个线程在开始运行前等待n个线程执行完毕。例如,启动一个服务时,主线程需要等待多个组件加载完毕,之后继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void testtwo() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(() -> {
try {
Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000));
System.out.println("finish," + index + "," + Thread.currentThread().getName());
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行
countDownLatch.await();
System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
}

  运行结果:

1
2
3
4
5
6
finish,4,Thread-4
finish,2,Thread-2
finish,3,Thread-3
finish,0,Thread-0
finish,1,Thread-1
主线程:在所有任务运行完成后,进行结果汇总

官方demo,可以参考

  demo1,这是一个将启动信号和结束信号结合起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);

for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();

doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}

class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}

void doWork() { ... }
}

  典型的主线程等待子线程完成的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...

for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));

doneSignal.await(); // wait for all to finish
}
}

class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}

void doWork() { ... }
}

原理

  简单来说,是通过一个计数器来实现的,计数器的初始值为线程的数量或者指定的值,调用await()方法的线程会被阻塞,直到计数器减到0时才能继续往下执行。
  CountDownLatch底层基于AQS实现,构造函数中指定的count直接赋阻值给AQS的state,每次countDown都是release(1)减一,最后减到0时unpark阻塞线程。而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
// 这里表示如果资源已经释放,就不能在释放一次
if (c == 0)
return false;
// 本次计数剩余量
int nextc = c-1;
// 设置剩余量
if (compareAndSetState(c, nextc))
// 等于0表示,本次计数完成后,释放资源了,await方法就不再阻塞
return nextc == 0;
}
}
}
1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
1
2
3
4
5
6
7
8
9
10
11
12
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// AQS
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

  这里调用tryAcquireShared方法尝试获取资源,返回负数表示失败,返回整数则表示成功,失败了就加入同步队列。
  tryAcquireShared方法中则根据计数来判断是否成功,如果计数为0,则计数完毕,返回成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void countDown() {
sync.releaseShared(1);
}

// AQS
public final boolean releaseShared(int arg) {
// 如果资源释放了,唤醒同步队列中的线程
if (tryReleaseShared(arg)) {
// unpark
doReleaseShared();
return true;
}
return false;
}

  这里面会尝试进行资源释放,如果资源释放了就唤醒同步队列中的线程。

大概流程

  1. 创建一个CountDownLatch,并赋予一个数值,这个值表示需要计数的次数,每次countdown算一次。
  2. 在主线程调用await方法,表示计数器完成前都不能动,await方法的内部实现依赖于内部的AQS,调用await方法的时候会尝试去获取资源,成功条件是state=0,也就是说除非countdown了count次之后,才能成功,失败的话当前线程进行休眠。
  3. 在子线程调用countDown方法,每次调用都会使内部的state-1,state为0时资源释放,await方法不在阻塞。

参考资料 & 鸣谢