Phaser
java7中引入了一种新的可重复使用的同步屏障,称为移相器Phaser.
Phaser拥有与CyclicBarrier和CountDownLatch类似的功劳.但是这个类提供了更加灵活的应用.CountDownLatch和CyclicBarrier都是只适用于固定数量的参与者.移相器适用于可变数目的屏障,在这个意义上,可以在任何时间注册新的参与者.并且在抵达屏障是可以注销已经注册的参与者.因此,注册到同步移相器的参与者的数目可能会随着时间的推移而变化
如CyclicBarrier一样,移相器可以重复使用,这意味着当前参与者到达移相器后,可以再一次注册自己并等待另一次到达.因此,移相器会有多代.一旦为某个特定相位注册的所有参与者都到达移相器,就增加相数.相数从零开始,在达到Integer.MAX_VALUE后,再次绕回0.当移相器发生变化时,通过重写onAdvance方法,可以自行可选操作.这个方法也可用于终止移相器.移相器一旦被终止,所有的同步方法就会立即返回,并尝试注册新的失败的参与者
移相器的另一个重要特征是:移相器可能是分层的,这允许你以树形结构来安排移相器以减少竞争.很明显,更小的组将拥有更少的竞争同步的参与者.因此,将大量的参与者分成较小的组可以减少竞争.虽然创建移相器能增加中的吞吐量,但是这需要更多的开销.最后,移相器的另一个重要的特征在于监控功能,使用独立的对象可以监视移相器的当前状态.监视器可以查询注册到移相器的参与者的数量,以及已经到达和还没有到达某个特定相数的参与者的数量
Phaser中是通过计数器来控制。在Phaser中计数器叫做parties, 我们可以通过Phaser的构造函数或者register()方法来注册
通过调用register()方法,我们可以动态的控制phaser的个数。如果我们需要取消注册,则可以调用arriveAndDeregister()方法,我们看下arrive:
public int arrive() {
return doArrive(ONE_ARRIVAL);
}
Phaser中arrive实际上调用了doArrive方法,doArrive接收一个adjust参数,ONE_ARRIVAL表示arrive,ONE_DEREGISTER表示arriveAndDeregister
Phaser中的arrive()、arriveAndDeregister()方法,这两个方法不会阻塞,但是会返回相应的phase数字,当此phase中最后一个party也arrive以后,phase数字将会增加,即phase进入下一个周期,同时触发(onAdvance)那些阻塞在上一phase的线程。这一点类似于CyclicBarrier的barrier到达机制;更灵活的是,我们可以通过重写onAdvance方法来实现更多的触发行为
下面看一个基本的使用:
void runTasks(List<Runnable> tasks) { final Phaser phaser = new Phaser(1); // "1" to register self // create and start threads for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { phaser.arriveAndAwaitAdvance(); // await all creation task.run(); } }.start(); } // allow threads to start and deregister self phaser.arriveAndDeregister(); }
上面的例子中,我们在执行每个Runnable之前调用register()来注册, 然后调用arriveAndAwaitAdvance()来等待这一个Phaser周期结束。最后我们调用 phaser.arriveAndDeregister()来取消注册主线程
下面来详细的分析一下运行步骤:
final Phaser phaser = new Phaser(1);
这一步我们初始化了一个Phaser,并且指定其现在party的个数为1
phaser.register();
这一步注册Runnable task到phaser,同时将party+1
phaser.arriveAndAwaitAdvance()
这一步将会等待直到所有的party都arrive。这里只会将步骤2中注册的party标记为arrive,而步骤1中初始化的party一直都没有被arrive。
phaser.arriveAndDeregister();
在主线程中,arrive了步骤1中的party,并且将party的个数减一
步骤3中的phaser.arriveAndAwaitAdvance()将会继续执行,因为最后一个phaser在步骤4中arrive了
多个Phaser周期
Phaser的值是从0到Integer.MAX_VALUE,每个周期过后该值就会加一,如果到达Integer.MAX_VALUE则会继续从0开始。
如果我们执行多个Phaser周期,则可以重写onAdvance方法:
protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}
onAdvance将会在最后一个arrive()调用的时候被调用,如果这个时候registeredParties为0的话,该Phaser将会调用isTerminated方法结束该Phaser
如果要实现多周期的情况,我们可以重写这个方法:
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties == 0;
}
上面的例子中,如果phase次数超过了指定的iterations次数则就会自动终止
我们看下实际的例子:
void startTasks(List<Runnable> tasks, final int iterations) { final Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; } }; phaser.register(); for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister(); // deregister self, don't wait }
上面的例子将会执行iterations次