java多线程
countDownLatch介绍
概念
countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
它是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
源码
countDownLatch类中只提供了一个构造器:
//参数count为计数值
public CountDownLatch(int count) { };
三个重要方法:
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//将count值减1
public void countDown() { };
使用示例:
public void multiThreadTest() {
final CountDownLatch begin = new CountDownLatch(1); // 控制线程开始
final CountDownLatch end = new CountDownLatch(30); // 控制线程结束
final ExecutorService exec = Executors.newFixedThreadPool(15); // 线程池管理线程
for (int j = 0; j < 30; j++) {
exec.submit(new ServiceThread(begin, end, j)); // 多个线程运动员准备中
}
begin.countDown(); // 发送开始号令,多线程运动员们开始运行
try {
end.await(); // 等待子线程全都运行完成之后,执行后续动作
} catch (InterruptedException e) {
log.error(e.getMessage());
}
exec.shutdown();
System.out.println("Done");
}
@Data
@AllArgsConstructor
public class ServiceThread implements Runnable {
private CountDownLatch begin;
private CountDownLatch end;
private int j;
@Override
public void run() {
try {
begin.await(); // 线程运动员准备中,阻塞在这等待其它运动员准备完成,然后等待号令begin.countDown()之后开始运行
for (int i = 0; i < 10; i++) {
System.out.println("j是" +j+",i是"+i);
}
} catch (Exception e) {
} finally {
/*
* 一个子线程运动员完成了,但主线程会阻塞在end.await()这里,
* 当end这个减为0时,将越过end.await()方法,执行后续操作。
* */
end.countDown();
}
}
}
CountDownLatch实现原理
1、创建计数器
当我们调用CountDownLatch countDownLatch=new CountDownLatch(4) 时候,此时会创建一个AQS的同步队列,并把创建CountDownLatch 传进来的计数器赋值给AQS队列的 state,所以state的值也代表CountDownLatch所剩余的计数次数;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);//创建同步队列,并设置初始计数器值
}
2、阻塞线程
当我们调用countDownLatch.wait()的时候,会创建一个节点,加入到AQS阻塞队列,并同时把当前线程挂起。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
判断计数器是计数完毕,未完毕则把当前线程加入阻塞队列
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//锁重入次数大于0 则新建节点加入阻塞队列,挂起当前线程
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
构建阻塞队列的双向链表,挂起当前线程
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//新建节点加入阻塞队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获得当前节点pre节点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);//返回锁的state
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//重组双向链表,清空无效节点,挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3、计数器递减
当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放锁的方式,对state进行减1操作,当state=0的时候证明计数器已经递减完毕,此时会将AQS阻塞队列里的节点线程全部唤醒。
public void countDown() {
//递减锁重入次数,当state=0时唤醒所有阻塞线程
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//递减锁的重入次数
if (tryReleaseShared(arg)) {
doReleaseShared();//唤醒队列所有阻塞的节点
return true;
}
return false;
}
private void doReleaseShared() {
//唤醒所有阻塞队列里面的线程
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//节点是否在等待唤醒状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始
continue;
unparkSuccessor(h);//成功则唤醒线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}