我在《JDK1.5引入的concurrent包》中,曾经介绍过CountDownLatch、CyclicBarrier两个类,还给出了CountDownLatch的演示案例。这里再系统总结下Java并发编程中的4个类CountDownLatch、CyclicBarrier、Semaphore、Phaser。
1.CountDownLatch
CountDownLatch可以理解为一个计数器在初始化时设置初始值,当一个线程需要等待某些操作先完成时,需要调用await()方法。这个方法让线程进入休眠状态直到等待的所有线程都执行完成。每调用一次countDown()方法,内部计数器减1,直到计数器为0时唤醒。这个可以理解为特殊的CyclicBarrier。
核心方法两个:countDown()和await()
countDown():使CountDownLatch维护的内部计数器减1,每个被等待的线程完成的时候调用
await():线程在执行到CountDownLatch的时候会将此线程置于休眠
案例场景:视频会议室里等与会人员到齐了会议才能开始。
package com.itszt.test3;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 视频会议室里等与会人员到齐了会议才能开始
*/
public class CountDownLatchTest {
private static int num=10;//与会人员数量
public static void main(String[] args) {
VideoConference conference = new VideoConference(num);
Thread threadConference = new Thread(conference);
threadConference.start();//开启await()方法,在内部计数器为0之前线程处于等待状态
for (int i = 0; i < num; i++) {
Participant p = new Participant(conference, "Participant " + i);
Thread t = new Thread(p);
t.start();
}
}
} //视频会议类
class VideoConference implements Runnable {
private final CountDownLatch controller; public VideoConference(int number) {
//计数器内等待其他线程的初始化数目
controller = new CountDownLatch(number);
} public void arrive(String name) {
System.out.printf("%s has arrived.\n", name);
controller.countDown();//调用countDown()方法,使内部计数器减1
System.out.printf("VideoConference: Waiting for %d participants.\n", controller.getCount());
} @Override
public void run() {
synchronized (VideoConference.class){
if(controller.getCount()!=0){
System.out.printf("VideoConference: Initialization: %d participants.\n", controller.getCount());
}
}
try {
controller.await();//等待,直到CoutDownLatch计数器为0 System.out.printf("VideoConference: All the participants have come\n");
System.out.printf("VideoConference: Let's start...\n");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} //参加会议的人员类
class Participant implements Runnable { private VideoConference conference;
private String name; public Participant(VideoConference conference, String name) {
this.conference = conference;
this.name = name;
} @Override
public void run() {
Long duration = (long) (Math.random() * 10);
try {
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
conference.arrive(name);//每到一个人员,CountDownLatch计数器就减少1
}
}
代码执行结果如下:
VideoConference: Initialization: 10 participants.
Participant 3 has arrived.
Participant 7 has arrived.
VideoConference: Waiting for 9 participants.
VideoConference: Waiting for 8 participants.
Participant 4 has arrived.
VideoConference: Waiting for 7 participants.
Participant 9 has arrived.
VideoConference: Waiting for 6 participants.
Participant 2 has arrived.
Participant 1 has arrived.
VideoConference: Waiting for 5 participants.
VideoConference: Waiting for 4 participants.
Participant 5 has arrived.
Participant 8 has arrived.
VideoConference: Waiting for 3 participants.
VideoConference: Waiting for 2 participants.
Participant 0 has arrived.
VideoConference: Waiting for 1 participants.
Participant 6 has arrived.
VideoConference: Waiting for 0 participants.
VideoConference: All the participants have come
VideoConference: Let's start...
需要注意的是,CountDownLatch是一个线程计数器。等计数器为0时,那些先前因调用await()方法休眠的线程被唤醒。CountDownLatch能够控制的线程是哪些呢?是那些调用了CountDownLatch的await()方法的线程。案例中,先运行await()方法的线程是视频会议的线程,然后执行与会者 线程,这里的处理是每到一位(每创建一个线程并运行run()方法时就使计数器减1)就让计数器减1,等计数器减为0时唤醒因调用await()方法进入休眠的线程。这里的与会者线程就是视频会议线程要等待的线程。
2.CyclicBarrier
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
当一个线程到达集合点时,它将调用await()方法等待其它的线程。线程调用await()方法后,CyclicBarrier将阻塞这个线程,并将它置入休眠状态等待其它线程的到来。等最后一个线程调用await()方法时,CyclicBarrier将唤醒所有等待的线程,然后这些线程将继续执行。CyclicBarrier可以传入另一个Runnable对象作为初始化参数。当所有的线程都到达集合点后,CyclicBarrier类将Runnable对象作为线程执行。
方法
await():使线程置入休眠直到最后一个线程的到来之后唤醒所有休眠的线程 CyclicBarrier类有两个常用的构造方法:
(1)CyclicBarrier(int parties)
这里的parties也是一个计数器,例如,初始化时parties里的计数是3,于是拥有该CyclicBarrier对象的线程当parties的计数为3时就唤醒,注意:这里parties里的计数在运行时当调用CyclicBarrier:await()时,计数就加1,一直加到初始的值。
(2)CyclicBarrier(int parties, Runnable barrierAction)
这里的parties与上一个构造方法的解释是一样的,这里需要解释的是第二个入参(Runnable barrierAction),这个参数是一个实现Runnable接口的类的对象,也就是说当parties加到初始值时就触发barrierAction的内容。
案例场景:有4个游戏玩家玩游戏,游戏有三个关卡,每个关卡必须要所有玩家都到达后才能允许通过。其实这个场景里的玩家中如果有玩家A先到了关卡1,他必须等到其他所有玩家都到达关卡1时才能通过,也就是说线程之间需要相互等待。这和CountDownLatch的应用场景有区别,CountDownLatch里的线程是到了运行的目标后继续干自己的其他事情,而这里的线程需要等待其他线程后才能继续完成后面的工作。
package com.itszt.test3;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 测试CyclicBarrier
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4,
new Runnable() {
@Override
public void run() {
System.out.println("所有玩家进入第 2 关!");
}
});
for (int i = 1; i <= 4; i++) {
new Thread(new Player(i, cyclicBarrier)).start();
}
}
} /**
* 玩家类
*
* @author itmyhome
*/
class Player implements Runnable {
private CyclicBarrier cyclicBarrier;
private int id; public Player(int id, CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
this.id = id;
} @Override
public void run() {
try {
System.out.println("玩家" + id + "正在玩第 1 关...");
cyclicBarrier.await();
System.out.println("玩家" + id + "进入第 2 关...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
代码执行结果如下:
玩家1正在玩第 1 关...
玩家3正在玩第 1 关...
玩家2正在玩第 1 关...
玩家4正在玩第 1 关...
所有玩家进入第 2 关!
玩家4进入第 2 关...
玩家1进入第 2 关...
玩家3进入第 2 关...
玩家2进入第 2 关...
3.Semaphore
信号量就是可以声明多把锁(包括一把锁,此时为互斥信号量)。
举个例子:一个房间如果只能容纳5个人,多出来的人必须在门外面等着。如何去做呢?一个解决办法就是:房间外面挂着五把钥匙,每进去一个人就取走一把钥匙,没有钥匙的不能进入该房间,而是在外面等待。每出来一个人就把钥匙放回原处以方便别人再次进入。
常用方法
acquire():获取信号量,信号量内部计数器减1
release():释放信号量,信号量内部计数器加1
tryAcquire():这个方法试图获取信号量,如果能够获取返回true,否则返回false
信号量控制的线程数量在声明时确定。例如:
Semaphore s = new Semaphore(2);
可以说,Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。
AQS(AbstractQueuedSynchronizer,抽象的队列式同步器)是 java.util.concurrent的基础。
Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock、FutureTask等虽然各自都有不同特征,
但是简单看一下源码,每个类内部都包含一个如下的内部类定义:
abstract static class Sync extends AbstractQueuedSynchronizer;
所有java.util.concurrent包中的同步器类都声明了一个私有的继承了AbstractQueuedSynchronizer
的内部类,并且把所有同步方法都委托给这个内部类。这样各个同步器类的公开方法就可以使用适合自己的名称。子类只需定义状态的检查与更新相关的方法,这些方法控制着acquire和 release操作。
AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词。state的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
同步器背后的基本思想非常简单。acquire操作如下:
while (synchronization state does not allow acquire) {
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;
release操作如下:
update synchronization state;
if (state may permit a blocked thread to acquire)
unblock one or more queued threads;
为了实现上述操作,需要下面三个基本组件的相互协作:
- 同步状态的原子性管理;
- 线程的阻塞与解除阻塞;
- 队列的管理;
同步器框架的核心决策是为这三个组件选择一个具体实现,同时在使用方式上又有大量选项可用。这里有意地限制了其适用范围,但是提供了足够的效率,使得实际上没有理由在合适的情况下不用这个框架而去重新建造一个。
到此,我们再继续看Semaphore同步器。为了简单起见,我们以一个停车场的运作为例。假设停车场只有三个车位,一开始三个车位都是空的。这时,如果同时来了五辆车,看门人允许其中三辆不受阻碍地进入,然后放下车拦,剩下的车则必须在停车场外的入口处等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,升起车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。在这个场景中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么停车场在进入两辆车后,其后的车辆就要等到有车离开才能获准许进入。对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。
Semaphore的主要方法有:
Semaphore(int permits):构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。
Semaphore(int permits,boolean fair):构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。
void acquire():从该信号量获取一个许可前,线程将一直阻塞。相当于一辆车占了一个车位。
void acquire(int n):从该信号量获取给定数目许可,在提供这些许可前,一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。
void release():释放一个许可,将其返回给信号量。就如同车开走返回一个车位。
void release(int n):释放n个许可。
int availablePermits():当前可用的许可数。
接下来写一个案例,有7个人,各自获取信号量的许可后,再释放许可。
package com.itszt.test3;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 测试Semaphore
*/
public class SemaphoreTest {
private static final Semaphore semaphore = new Semaphore(3);//默认为非公平信号量
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
//信号量控制的线程
private static class InformationThread extends Thread {
private final String name;
private final int age; public InformationThread(String name, int age) {
this.name = name;
this.age = age;
} public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + ":大家好,我是" + name + "我今年" + age + "岁当前时间为:" + System.currentTimeMillis());
Thread.sleep(1000);
System.out.println(name + "要准备释放许可证了,当前时间为:" + System.currentTimeMillis());
System.out.println("当前可使用的许可数为:" + semaphore.availablePermits());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} public static void main(String[] args) {
String[] name = {"李明", "王五", "张杰", "王强", "赵二", "李四", "张三"};
int[] age = {26, 27, 33, 45, 19, 23, 41};
for (int i = 0; i < 7; i++) {
Thread t1 = new InformationThread(name[i], age[i]);
threadPool.execute(t1);
}
}
}
上述代码执行结果如下:
pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1524367640560
pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1524367640560
pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1524367640560
李明要准备释放许可证了,当前时间为:1524367641560
王五要准备释放许可证了,当前时间为:1524367641560
张杰要准备释放许可证了,当前时间为:1524367641560
当前可使用的许可数为:0
当前可使用的许可数为:0
当前可使用的许可数为:0
pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1524367641560
pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1524367641560
pool-1-thread-2:大家好,我是李四我今年23岁当前时间为:1524367641560
李四要准备释放许可证了,当前时间为:1524367642563
赵二要准备释放许可证了,当前时间为:1524367642563
王强要准备释放许可证了,当前时间为:1524367642563
当前可使用的许可数为:0
当前可使用的许可数为:0
pool-1-thread-3:大家好,我是张三我今年41岁当前时间为:1524367642563
当前可使用的许可数为:0
张三要准备释放许可证了,当前时间为:1524367643563
当前可使用的许可数为:2
我们上面用的是非公平信号量,改为公平信号量:
private static final Semaphore semaphore = new Semaphore(3,true);
这时运行结果如下:
pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1524367824968
pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1524367824968
pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1524367824968
李明要准备释放许可证了,当前时间为:1524367825968
王五要准备释放许可证了,当前时间为:1524367825968
张杰要准备释放许可证了,当前时间为:1524367825968
当前可使用的许可数为:0
当前可使用的许可数为:0
当前可使用的许可数为:0
pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1524367825968
pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1524367825968
pool-1-thread-3:大家好,我是李四我今年23岁当前时间为:1524367825968
王强要准备释放许可证了,当前时间为:1524367826968
李四要准备释放许可证了,当前时间为:1524367826968
赵二要准备释放许可证了,当前时间为:1524367826968
当前可使用的许可数为:0
当前可使用的许可数为:0
pool-1-thread-1:大家好,我是张三我今年41岁当前时间为:1524367826968
当前可使用的许可数为:0
张三要准备释放许可证了,当前时间为:1524367827968
当前可使用的许可数为:2
Semaphore信号量的实现和ReetrantLock类似,都是通过内部类Sync,Sync是一个继承于AQS的抽象类; Semaphore信号量和ReentrantLock互斥锁的实现区别在于,ReentrantLock互斥锁的state如果为0则表示锁未被占用,如果为0之外的数值表示锁被重入的次数;Semaphore信号量的state表示许可的数目; Sync包括两个子类:公平信号量FairSync和非公平信号量NonfailrSync,默认是非公平信号量NonfairSync。其中,公平信号量是指如果线程不在同步队列头部则排队等候;非公平信号量是指无论当前线程是否在同步队列头部,都会尝试获取信号量。
信号量如果要实现单例模式,可以这样修改:
private static final Semaphore semaphore=new Semaphore(1);
再执行代码,结果则如下:
pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1524368235314
李明要准备释放许可证了,当前时间为:1524368236317
当前可使用的许可数为:0
pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1524368236317
张杰要准备释放许可证了,当前时间为:1524368237317
当前可使用的许可数为:0
pool-1-thread-3:大家好,我是张三我今年41岁当前时间为:1524368237317
张三要准备释放许可证了,当前时间为:1524368238317
当前可使用的许可数为:0
pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1524368238317
赵二要准备释放许可证了,当前时间为:1524368239317
当前可使用的许可数为:0
pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1524368239317
王五要准备释放许可证了,当前时间为:1524368240317
当前可使用的许可数为:0
pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1524368240317
王强要准备释放许可证了,当前时间为:1524368241317
当前可使用的许可数为:0
pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1524368241317
李四要准备释放许可证了,当前时间为:1524368242317
当前可使用的许可数为:0
可见,Semaphore将给定许可数设置为1,就如同一个单例模式,即单个停车位,只有一辆车进,然后这辆车出来后,下一辆车才能进。
另外,我们在上面的案例中用到了线程池:
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
其中,ThreadPoolExecutor的构造方法体系有:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
对于构造方法的参数说明如下:
corePoolSize
核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true。
maximumPoolSize
线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。
keepAliveTime
非核心线程的闲置超时时间,超过这个时间就会被回收。
unit
指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。
workQueue
线程池中的任务队列。
常用的队列有:LinkedBlockingQueue,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。
threadFactory
线程工厂,提供创建新线程的功能。ThreadFactory是一个接口,只有一个方法:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
RejectedExecutionHandler
RejectedExecutionHandler也是一个接口,只有一个方法
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}
当线程池中的资源已经全部使用,添加新线程又被拒绝时,会调用RejectedExecutionHandler的rejectedExecution方法。
线程池的线程执行规则跟任务队列有很大的关系。其中:
(1)在任务队列没有大小限制时:
①如果线程数量<=核心线程数量,那么直接启动一个核心线程来执行任务,不会放入队列中。
② 如果线程数量>核心线程数,但<=最大线程数,并且任务队列是LinkedBlockingDeque的时候,超过核心线程数量的任务会放在任务队列中排队。
③如果线程数量>核心线程数,但<=最大线程数,并且任务队列是SynchronousQueue的时候,线程池会创建新线程执行任务,这些任务也不会被放在任务队列中。这些线程属于非核心线程,在任务完成后,闲置时间达到了超时时间就会被清除。
④如果线程数量>核心线程数,并且>最大线程数,当任务队列是LinkedBlockingDeque时,会将超过核心线程的任务放在任务队列中排队。也就是说,当任务队列是LinkedBlockingDeque并且没有大小限制时,线程池的最大线程数设置是无效的,它的线程数最多不会超过核心线程数。
⑤如果线程数量>核心线程数,并且>最大线程数,当任务队列是SynchronousQueue的时候,会因为线程池拒绝添加任务而抛出异常。
(2)在任务队列大小有限时:
①当LinkedBlockingDeque塞满时,新增的任务会直接创建新线程来执行,当创建的线程数量超过最大线程数量时会抛异常。
②SynchronousQueue没有数量限制。因为它根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。
在ThreadPoolExecutor中用到了BlockingQueue阻塞队列的接口。请参考我的另一篇博文《Java中的BlockingQueue》。
4.Phaser
Phaser是一个更加复杂和强大的同步辅助类,它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时,(CyclicBarrier是分成两步),就可以选择使用Phaser。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
可以说,Phaser允许并发多阶段任务。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。
一个Phaser对象有两种状态:
- 活跃态(Active):当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。
- 终止态(Termination):当所有参与同步的线程都取消注册的时候,Phaser就处于终止态,在终止状态下,Phaser没有任何参与者。当Phaser对象onAdvance()方法返回True时,Phaser对象就处于终止态。当Phaser处于终止态时,同步方法arriveAndAwaitAdvance()会立即返回,而且不会做任何同步操作。
arriveAndAwaitAdvance():类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行
arriveAndDeregister():把执行到此的线程从Phaser中注销掉
isTerminated():判断Phaser是否终止
register():将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程
forceTermination():强制Phaser进入终止态
案例场景:Phaser将同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内改过扩展名为.txt的文件。这个任务分解为三个步骤:①在指定文件夹及其子文件夹中获得扩展名为.txt的文件;②对第一步的结果过滤,删除修改时间超过24小时的文件;③将结果打印数据到控制台。
package com.itszt.test3;
import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
/**
* 测试Phaser
*/
public class PhaserTest {
public static void main(String[] args) {
Phaser phaser=new Phaser(3);
FileSearch system=new FileSearch("E:\\a", ".txt",
phaser);
FileSearch apps=new FileSearch("E:\\b", ".txt",
phaser);
FileSearch documents=new FileSearch("E:\\c", ".txt",
phaser);
Thread systemThread=new Thread(system, "system-a");
systemThread.start();
Thread appsThread=new Thread(apps, "apps-b");
appsThread.start();
Thread documentsThread=new Thread(documents, "documents-c");
documentsThread.start();
try {
systemThread.join();
appsThread.join();
documentsThread.join();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Terminated:"+ phaser.isTerminated());
}
}
class FileSearch implements Runnable {
private String initPath;// 查找路径
private String end;// 文件后缀
private List<String> results;// 结果集
private Phaser phaser; public FileSearch(String initPath, String end, Phaser phaser) {
this.initPath = initPath;
this.end = end;
this.phaser = phaser;
this.results = new ArrayList<String>();
} private void direactoryProcess(File file) {
File list[] = file.listFiles();
if (list != null) {
for (File f : list) {
if (f.isDirectory()) {
direactoryProcess(f);
} else {
fileProcess(f);
}
}
}
} private void fileProcess(File file) {
if (file.getName().endsWith(end)) {
results.add(file.getAbsolutePath());
}
} private void filterResult() {
List<String> newResult = new ArrayList<String>();
long actualDate = new Date().getTime();
for (int i = 0; i < results.size(); i++) {
File file = new File(results.get(i));
long lastModifyTime = file.lastModified();
if (actualDate - lastModifyTime < TimeUnit.MICROSECONDS.
convert(1,
TimeUnit.DAYS)) {
newResult.add(results.get(i));
}
}
results = newResult;
} private boolean checkResults() {
if (results.isEmpty()) {
System.out.println(Thread.currentThread().
getName() + ": Phase "
+ phaser.getPhase() + " 0 result");
System.out.println(Thread.currentThread().
getName() + ": Phase "
+ phaser.getPhase() + " end");
phaser.arriveAndDeregister();
return false;
} else {
System.out.println(Thread.currentThread().
getName() + ": Phase "
+ phaser.getPhase() + " " +
results.size() + " result");
phaser.arriveAndAwaitAdvance();
return true;
}
} private void showInfo() {
for (int i = 0; i < results.size(); i++) {
System.out.println(Thread.currentThread().
getName() + ":"
+ results.get(i));
}
phaser.arriveAndAwaitAdvance();
} @Override
public void run() {
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().
getName()+": Starting");
File file=new File(initPath);
if(file.isDirectory()){
direactoryProcess(file);
}
if(!checkResults()){
return;
}
filterResult();
if(!checkResults()){
return;
}
showInfo();
phaser.arriveAndDeregister();
System.out.println(Thread.currentThread().
getName()+": Work completed");
}
}
控制台打印如下:
system-a: Starting
system-a: Phase 1 1 result
apps-b: Starting
documents-c: Starting
documents-c: Phase 1 1 result
apps-b: Phase 1 1 result
apps-b: Phase 2 1 result
system-a: Phase 2 1 result
documents-c: Phase 2 1 result
documents-c:E:\c\jsp技术.txt
apps-b:E:\b\jsp技术.txt
system-a:E:\a\jsp技术.txt
system-a: Work completed
documents-c: Work completed
apps-b: Work completed
Terminated:true