原创

Java并发工具之CountDownLatch


一、简介

CountDownLatch允许一个或多个线程等待其他线程完成操作。

CountDownLatch,可以理解为倒计时工具,类似“三二一,芝麻开门”。该工具用于一组操作执行完成才能执行后续操作的场景。例如项目小组的晨会,需要等到所有人齐了才能正式开会。

CountDownLatch是通过一个计数器来实现的,当我们在new一个CountDownLatch对象的时候初始化计数器的值,该值也代表了需要等待任务的完成数。每当一个线程完成自己的任务后,计数器的值就会减1。当计数器的值变为0时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了

二、类总览

1. 核心属性

CountDownLatch主要是通过AQS的共享锁机制实现的,因此它的核心属性只有一个sync

private final Sync sync;

我们来看一下Sync内部类的源码

private static final class Sync extends AbstractQueuedSynchronizer {
    //序列化版本号
    private static final long serialVersionUID = 4982264981922014374L;

    //构造函数
    Sync(int count) {
        setState(count);
    }

    //获取计数值
    int getCount() {
        return getState();
    }

    //重写AQS的方法,实现共享锁的获取
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    //重写AQS的方法,实现共享锁的释放
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

它继承自AQS,同时重写了tryAcquireSharedtryReleaseShared,以完成具体的实现共享锁的获取与释放的逻辑。方法的细节下面介绍。

2. 构造函数

public CountDownLatch(int count) {
    //传递的计数值不能小于0
    if (count < 0) throw new IllegalArgumentException("count < 0");
    //初始化sync,即初始化state值
    this.sync = new Sync(count);
}

三、使用案例

以简介中说到的开会为例,如果参会人员一共10人,需要等到10人入场会议才开始,代码如下(先混个脸熟,熟悉一下基本用法

public static void main(String[] args) throws Exception {
    //参会人数
    int threadNum = 10;
    //创建countDownLatch,传递计数值为参会人数
    CountDownLatch countDownLatch = new CountDownLatch(threadNum);
    System.out.println("组织开会,一共有" + threadNum + "名员工参加会议。");
    //员工入场
    for (int i = 1; i <= threadNum; i++) {
        new Thread(() ->
            {
                System.out.println(Thread.currentThread().getName() + "号员工到达会场。");
                //计数减一
                countDownLatch.countDown();
            }, String.valueOf(i)).start();
    }
    //老板等待员工入场
    countDownLatch.await();
    System.out.println("人到齐了,开会吧。");
}

执行结果如下

组织开会,一共有10名员工参加会议。
1号员工到达会场。
2号员工到达会场。
3号员工到达会场。
4号员工到达会场。
6号员工到达会场。
5号员工到达会场。
7号员工到达会场。
9号员工到达会场。
10号员工到达会场。
8号员工到达会场。
人到齐了,开会吧。

主线程调用await方法等待子线程的执行结束,每个子线程执行结束后调用方法countDown,当计数减为0时,主线程就会从await方法返回,继续执行。

四、核心方法分析

1. countDown

CountDownLatch提供countDown方法递减锁存器的计数,如果计数到达零,则释放所有等待的线程

public void countDown() {
    sync.releaseShared(1);
}

调用releaseShared方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

这个AQS中的释放共享锁的方法,它会调用子类重写的tryReleaseShared方法,代码如下

protected boolean tryReleaseShared(int releases) {
    //自旋
    for (;;) {
        //获取同步状态
        int c = getState();
        //如果同步状态为0,说明计数已经为0了,释放失败,直接返回false
        if (c == 0)
            return false;
        //否则计算释放后的同步状态
        int nextc = c-1;
        //CAS修改同步状态,修改成功则退出自旋,否则继续自旋
        if (compareAndSetState(c, nextc))
            //返回更新后的同步状态是否为0
            return nextc == 0;
    }
}

根据分析可得,该方法只有在count值原来大于0,经过调用后变为0,才会返回true,否则返回false返回true代表前面所有的任务都完成了,此时会满足if条件,会调用方法doReleaseShared尝试唤醒被阻塞的线程doReleaseShared方法在前面AQS的详解(深入理解AQS实现原理)中已经说过,这么不再赘述。

值得一提的是,我们其实并不关心releaseShared的返回值,而只关心tryReleaseShared的返回值,这里更像是借了共享锁的“壳”,来完成我们的目的

2. await

CountDownLatch提供await()方法来使当前线程在计数至零之前一直等待,除非线程被中断。它与Condition的await方法的语义相同,该方法是阻塞式地等待,并且是响应中断的,只不过它不是在等待signal操作,而是在等待count值为0。代码如下

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

调用AQS中的acquireSharedInterruptibly方法

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //响应中断
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

首先响应中断,然后调用子类的tryAcquireShared方法

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

该方法的逻辑相当简单,没有任何抢锁的行为,没有任何CAS操作,只是简单判断同步状态是否为0,是返回1,否则返回-1

  • 如果状态等于0,说明前面的任务都已经完成了,这里没必要阻塞挂起了,因此返回1,不进入方法doAcquireSharedInterruptibly,直接结束。
  • 如果状态不等于0,即大于0,说明任务未完成,当前线程需要入队阻塞挂起,并自旋判断state是否等于0。

五、总结

  • CountDownLatch内部通过共享锁实现。
  • 在创建CountDownLatch实例时,需要传递一个int型的参数,该参数为计数器的初始值,也可以理解为该共享锁可以获取的总次数。
  • 当某个线程调用await方法,程序首先判断count的值是否为0如果count不是0的话则会入CLH队后挂起,然后等待唤醒后自旋直到count为0为止
  • 当其他线程调用countDown方法时,则会使count值减一(如果是0则不减)当count值原来不为0,操作后等于0的情况下,就会主动去唤醒CLH队列的第一个被阻塞的线程
Java
Java并发
  • 作者:贤子磊
  • 发表时间:2020-09-25 01:02
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 评论

    您需要登录后才可以评论