- 1 Semaphore的使用与原理 [ˈsɛməˌfɔr, -ˌfor]
- 2 CountDown(倒计时锁)的使用与应用
- 3 Cyclicbarrier(循环栅栏)的使用以及注意点(批量任务循环执行)
- 参考资料
1 Semaphore的使用与原理 [ˈsɛməˌfɔr, -ˌfor]
1-1 概述
应用场景:用来限制能同时访问共享资源的线程上限
实例:每个时刻最多三个线程访问资源
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
@Slf4j(topic = "c.test19")
public class test19 {
public static void main(String[] args) {
// 1. 创建 semaphore 对象
// permits参数限制访问线程数目, fair参数控制是否公平
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行,但同一时刻最多只有三个线程获得资源
// semaphore限制了访问资源的线程数目
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("end...");
} finally {
semaphore.release();
}
}).start();
}
}
}
执行结果:同一时刻最多有三个线程拿到资源
21:16:12.378 [Thread-1] DEBUG c.test19 - running...
21:16:12.378 [Thread-2] DEBUG c.test19 - running...
21:16:12.378 [Thread-0] DEBUG c.test19 - running...
21:16:13.383 [Thread-1] DEBUG c.test19 - end...
21:16:13.383 [Thread-2] DEBUG c.test19 - end...
21:16:13.383 [Thread-0] DEBUG c.test19 - end...
21:16:13.383 [Thread-3] DEBUG c.test19 - running...
21:16:13.383 [Thread-4] DEBUG c.test19 - running...
21:16:13.383 [Thread-5] DEBUG c.test19 - running...
21:16:14.384 [Thread-4] DEBUG c.test19 - end...
21:16:14.384 [Thread-5] DEBUG c.test19 - end...
21:16:14.384 [Thread-6] DEBUG c.test19 - running...
21:16:14.385 [Thread-7] DEBUG c.test19 - running...
21:16:14.385 [Thread-3] DEBUG c.test19 - end...
21:16:14.385 [Thread-8] DEBUG c.test19 - running...
21:16:15.385 [Thread-8] DEBUG c.test19 - end...
21:16:15.386 [Thread-9] DEBUG c.test19 - running...
21:16:15.386 [Thread-7] DEBUG c.test19 - end...
21:16:15.386 [Thread-6] DEBUG c.test19 - end...
21:16:16.387 [Thread-9] DEBUG c.test19 - end...
1-2 Semaphore的应用:限制对共享资源的使用
1-2-1 应用场景
1)使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数
---(资源的数目与请求的线程(请求成功才分配资源,一个线程可能分配多种类型资源)的数目不是一个概念)。
2)用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好。
- 当一个连接对应一个资源,比如数据库连接池就是一个线程对应一个数据库连接,这种2)场景非常适合使用semaphore。
1-2-2 使用semaphore优化自定义的数据库连接池
使用wait/notify结合原子数组实现基于享元模式的连接池
- 之前的wait/notify主要在线程池资源全部分配完了调用wait使得的线程进入waitset,当有线程释放资源调用notify唤醒线程来使用资源
优化后代码
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicIntegerArray;
@Slf4j(topic = "c.pool")
class Pool{
// 01 连接池大小
private final int poolSize;
// 02 连接对象数组
private Connection[] connections;
// 03 连接状态数组,0表示空闲,1表示繁忙
private AtomicIntegerArray states;
// 04 使用semaphore管理线程池的容量
private Semaphore semaphore;
Pool(int poolSize){
this.poolSize = poolSize;
this.semaphore = new Semaphore(poolSize);
this.states = new AtomicIntegerArray(new int[poolSize]);
this.connections = new Connection[poolSize];
for(int i = 0;i < poolSize;++i){
this.connections[i] = new MockConnection();
}
}
public Connection borrow(){
try {
semaphore.acquire(); // 获取资源的许可,如果没有资源则会让线程
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int i = 0;i < poolSize;++i){
if(states.get(i) == 0){
// 注意:这里必须采用CAS操作确保多个线程状态变量的安全性
states.compareAndSet(i,0,1);
log.warn("borrow connection {}",i);
return connections[i];
}
}
return null;
}
public void free(Connection conn){
for(int i = 0;i < poolSize;++i){
if(connections[i] == conn){
states.set(i,0);
log.warn("free connection {}",i);
semaphore.release(); // 释放许可
break;
}
}
}
}
public class test20 {
public static void main(String[] args) {
Pool pool = new Pool(2);
for(int i = 0;i < 5;++i){
new Thread(()->{
Connection tmp = pool.borrow();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(tmp);
}).start();
}
}
}
// mock:虚假的
// 这里实现了一个虚假的连接池对象
class MockConnection implements Connection{
.....
}
执行结果
21:52:13.475 [Thread-1] WARN c.pool - borrow connection 0
21:52:13.475 [Thread-0] WARN c.pool - borrow connection 0
21:52:13.675 [Thread-0] WARN c.pool - free connection 0
21:52:13.675 [Thread-2] WARN c.pool - borrow connection 0
21:52:14.364 [Thread-1] WARN c.pool - free connection 0
21:52:14.364 [Thread-3] WARN c.pool - borrow connection 0
21:52:14.531 [Thread-2] WARN c.pool - free connection 0
21:52:14.531 [Thread-4] WARN c.pool - borrow connection 0
21:52:14.546 [Thread-4] WARN c.pool - free connection 0
21:52:14.916 [Thread-3] WARN c.pool - free connection 0
总结:semaphore的合理使用取代了wait/notify,使得代码更加简洁吗,容易读
1-3 Semaphore的原理(AQS等待队列节点模式是Shared的)
概述:其基本思想与读写锁的读锁的获取与释放基本一致
1-3-1 Semaphore的构造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
- 可以看到Semaphore也有2类同步器:公平同步器与非公平同步器
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
========================================================================================
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) { // 许可的数目被存储在AQS的state中
setState(permits);
}
final int getPermits() {
return getState();
}
......
}
- 可以看到许可的数目被存储在AQS的state中。
1-3-1 Semaphore的acquire方法原理(获取资源的数目)
基本思路
分为二种情况讨论:
情况1:当前资源足够,则修改资源的数目
情况2:资源不足的情况下,则将线程放入AQS的的等待队列。并用park停止运行。
一些细节:
1)state的维护(CAS机制)
2)队列的维护(CAS机制)
3)第一次失败后,还会进行几次尝试获取资源
源码分析
semaphore.acquire(); // 获取资源的许可,如果没有资源则会让线程阻塞
==================================================================================
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
==================================================================================
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // step1:尝试获取args数量的许可,如果申请失败返回-1
doAcquireSharedInterruptibly(arg);
// step2:将线程加入AQS阻塞队列,可能尝试几次,之后调用park方法让线程停止运行
}
===tryAcquireShared(arg):的两个公平/非公平实现===================================
final int nonfairTryAcquireShared(int acquires) {su
for (;;) {
int available = getState();
int remaining = available - acquires; //可获的许可数-线程申请的许可数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining; // 如果资源足够返回剩下的许可数
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
==================================================================================
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
======================================================================================
获取资源的图示过程
1-3-1 Semaphore的release方法原理
基本思想:
当前线程释放*拥有的的资源,并修改state,根据当前节点的waitstate判断是否去AQS的等待队列中去唤醒一个线程去获取资源。
细节:
1)semaphore的AQS阻塞队列的节点时Shared的状态,只要资源充足唤醒会propagate.
semaphore.release(); // 释放当前线程拥有的许可
====================================================================
public void release() {
sync.releaseShared(1);
}
=====================================================================
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
========================================================================
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
===========================================================================
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // SIGNAL表明存在后续的节点进行唤醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
======================================================================
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //唤醒线程
}
=============唤醒后的线程进行propagate唤醒==================================================
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 只要资源充足唤醒会propagate
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 在此被唤醒
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
=========================================================================================
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 再次唤醒后面的节点
}
}
释放资源的图示过程
2 CountDown(倒计时锁)的使用与应用
[kaʊnt]
3-1 概述
应用场景:用来进行线程同步协作,等待所有线程完成倒计时。
- 构造参数用来初始化等待计数值
- await() 用来等待计数归零
- countDown() 用来让计数减一
与join的区别:join相对而言属于比较底层的API,使用比较繁琐,且必须等待线程结束,CountDownLatch提供了更加灵活的与线程同步的方式(线程可以在完成某个条件调用countDown()方法)。
源码速看
本质上利用了AQS的state变量计数实现了类似join的功能
- countDown() 让锁的state减去1
- await() 则当state不等于0的情况下,阻塞调用线程。
package java.util.concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 当state==0能够获得锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 释放锁的时候,当state为0的时候,返回false;
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 只有当state为0的时候,才能通过该方法获得锁,否则调用线程会一直阻塞。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
======================================================================================
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3-2 简单应用
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
@Slf4j(topic = "c.test21")
public class test21 {
public static void main(String[] args) throws InterruptedException {
/*本质上CountDownLatch就是让state充当了计数功能,
利用了AQS继承的锁在state不为0的情况下无法获得锁阻塞的特性,实现了类似join函数的功能*/
CountDownLatch latch = new CountDownLatch(3);
new Thread(()->{
log.warn("begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("end...");
latch.countDown(); //线程执行完毕,让state--
},"t1").start();
new Thread(()->{
log.warn("begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("end...");
latch.countDown(); //线程执行完毕,让state--
},"t2").start();
new Thread(()->{
log.warn("begin");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("end...");
latch.countDown(); //线程执行完毕,让state--
},"t3").start();
log.warn("start waiting");
latch.await(); // state == 0的时候,才会往下执行,否则进入AQS等待队列
log.warn("end waiting");
}
}
执行结果
- 主线程等待其他三个线程执行完毕才往下执行
10:01:58.911 [t2] WARN c.test21 - begin
10:01:58.911 [t3] WARN c.test21 - begin
10:01:58.911 [main] WARN c.test21 - start waiting
10:01:58.911 [t1] WARN c.test21 - begin
10:01:59.917 [t1] WARN c.test21 - end...
10:01:59.917 [t2] WARN c.test21 - end...
10:02:01.916 [t3] WARN c.test21 - end...
10:02:01.916 [main] WARN c.test21 - end waiting
3-3 CountDownLatch配合线程池使用
package chapter8;
import ch.qos.logback.core.util.ExecutorServiceUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j(topic = "c.test22")
public class test22 {
public static void main(String[] args) throws InterruptedException {
/*本质上CountDownLatch就是让state充当了计数功能,利用了AQS继承的锁在state不为0的情况下无法获得锁阻塞的特性,实现
* 了类似join函数的功能*/
CountDownLatch latch = new CountDownLatch(3);
ExecutorService pool = Executors.newFixedThreadPool(4);
pool.submit(()->{
log.warn("t1 begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("t1 end...");
latch.countDown(); //线程执行完毕,让state--
});
pool.submit(()->{
log.warn("t2 begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("t2 end...");
latch.countDown(); //线程执行完毕,让state--
});
pool.submit(()->{
log.warn("t3 begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("t3 end...");
latch.countDown(); //线程执行完毕,让state--
});
pool.submit(()->{
log.warn("start waiting");
try {
latch.await(); // state == 0的时候,才会往下执行
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("end waiting");
});
}
}
执行结果
- 线程池中三个线程负责处理,一个线程负责汇总结果
10:34:19.232 [pool-1-thread-3] WARN c.test22 - t3 begin
10:34:19.232 [pool-1-thread-1] WARN c.test22 - t1 begin
10:34:19.232 [pool-1-thread-4] WARN c.test22 - start waiting
10:34:19.232 [pool-1-thread-2] WARN c.test22 - t2 begin
10:34:20.238 [pool-1-thread-2] WARN c.test22 - t2 end...
10:34:20.238 [pool-1-thread-3] WARN c.test22 - t3 end...
10:34:20.238 [pool-1-thread-1] WARN c.test22 - t1 end...
10:34:20.238 [pool-1-thread-4] WARN c.test22 - end waiting
3-4 CountDownLatch的典型应用场景介绍
场景1:多个游戏资源并发加载完成通知
在这个场景下,某个线程进入到下一个流程,需要多个资源加载好,这里假定为10个,可以采用线程池,令该线程提交10个任务分别加载一个资源,然后调用countDownLatch.await去等待所有资源加载完毕,每个任务准备好资源就countDown。
实例
package chapter8;
import java.util.*;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class test23{
public static void main(String[] args) throws InterruptedException {
AtomicInteger num = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {
return new Thread(r, "t" + num.getAndIncrement());
});
CountDownLatch latch = new CountDownLatch(10);
String[] all = new String[10];
Random r = new Random();
for (int j = 0; j < 10; j++) { // 模拟提交10个线程任务
int x = j;
// 线程池中每个任务模拟加载资源,当资源加载完毕,调用countDown(),将state--
service.submit(() -> {
for (int i = 0; i <= 100; i++) {
try {
Thread.sleep(r.nextInt(100));
} catch (InterruptedException e) {
}
all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")";
System.out.print("\r" + Arrays.toString(all));
}
latch.countDown();
});
}
latch.await();
System.out.println("\n游戏开始...");
service.shutdown();
}
}
执行结果
- 10个资源都加载好了,才进行下一个流程
[t0(100%), t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%),t8(100%), t9(100%)]
游戏开始...
场景2微服务场景下多个RPC远程调用并发执行
在这个场景下,当前用户的请求,需要多个服务器资源,比如需要商品信息,订单信息,快递信息共三个信息,则需要进行三次远程调用,如果顺序执行的话,显然效率比较低下。
此时可以采用多线程的方式利用线程池配合CountDownLatch并发的去获取资源(采用线程的future对象返回线程执行完返回的结果),节约时间。
3 Cyclicbarrier(循环栅栏)的使用以及注意点(批量任务循环执行)
[ˈsaɪklɪk] [ˈbæriər]
3-1 Cyclicbarrier概述
作用:用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执
行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。
- 每次调用 await()会令state减去1,当state为0,会调用构造函数的第二个参数runable接口。
应用场景:应用于需要重复执行一批任务。可以看成循环版的CountDownLatch。
3-2 实例
package chapter8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j(topic = "c.test24")
public class test24 {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
/*参数1:线程池的大小, 参数2: state为0,调用实现的runable接口*/
CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
log.debug("task1, task2 finish...");
});
/* 将线程池中的2个task循环执行三次,CyclicBarrier计数为0的时候会自定恢复到初值,即
这个类是可重用的。*/
for (int i = 0; i < 3; i++) { // task1 task2 task1
service.submit(() -> {
log.debug("task1 begin...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
barrier.await(); // 2-1=1
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
service.submit(() -> {
log.debug("task2 begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
barrier.await(); // 1-1=0
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}
执行结果
11:57:12.345 [pool-1-thread-2] DEBUG c.test24 - task2 begin...
11:57:12.345 [pool-1-thread-1] DEBUG c.test24 - task1 begin...
11:57:14.349 [pool-1-thread-2] DEBUG c.test24 - task1, task2 finish...
11:57:14.349 [pool-1-thread-2] DEBUG c.test24 - task1 begin...
11:57:14.349 [pool-1-thread-1] DEBUG c.test24 - task2 begin...
11:57:16.349 [pool-1-thread-1] DEBUG c.test24 - task1, task2 finish...
11:57:16.349 [pool-1-thread-1] DEBUG c.test24 - task1 begin...
11:57:16.349 [pool-1-thread-2] DEBUG c.test24 - task2 begin...
11:57:18.349 [pool-1-thread-2] DEBUG c.test24 - task1, task2 finish...
注意点
循环执行固定数量任务,一定要确保线程池大小与CyclicBarrier第一个参数一致
Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
log.debug("task1, task2 finish...");
});
线程池大小与初始计数不一致:
Executors.newFixedThreadPool(3);
CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
log.debug("task1, task2 finish...");
});
执行结果
11:55:39.325 [pool-1-thread-3] DEBUG c.test24 - task1 begin...
11:55:39.325 [pool-1-thread-2] DEBUG c.test24 - task2 begin...
11:55:39.325 [pool-1-thread-1] DEBUG c.test24 - task1 begin...
11:55:40.329 [pool-1-thread-3] DEBUG c.test24 - task1, task2 finish...
11:55:40.329 [pool-1-thread-3] DEBUG c.test24 - task2 begin...
11:55:40.329 [pool-1-thread-1] DEBUG c.test24 - task1 begin...
11:55:41.330 [pool-1-thread-1] DEBUG c.test24 - task1, task2 finish...
11:55:41.330 [pool-1-thread-1] DEBUG c.test24 - task2 begin...
11:55:43.330 [pool-1-thread-1] DEBUG c.test24 - task1, task2 finish...
从结果可以看到在第一个循环中任务还没有执行完前,多余的空闲进程会执行下一个循环中同样任务。