package com.abc.test
import org.junit.Assert
import org.junit.Test
import java.util.concurrent.*
/**
*/
class PhaserTest {
/**
* Phaser 除具有CyclicBarrier的所有功能外,还多加了以下功能:
* 1 允许参与者到达一个目标点之后退出.
* 2 允许线程只在某一轮等待.
* 3 动态的增加参与者数量.
*
* 以下指定Phaser的参与者数量是3.
* 运作之后可以得出结论:
* 只有打印了“3: before arriveAndAwaitAdvance” 之后 ,
* 线程1 和线程2 的 phaser.arriveAndAwaitAdvance() 之后的代码才执行.
* 可以尝试修改线程3的sleep时间来看效果.
*/
@Test
fun tesArriveAndAwaitAdvance() {
val executor = Executors.newFixedThreadPool(5)
val phaser = Phaser(3)
executor.execute {
println("1")
phaser.arriveAndAwaitAdvance();
println("1 is over")
}
executor.execute {
println("2")
phaser.arriveAndAwaitAdvance();
println("2 is over")
}
executor.execute {
println("3")
Thread.sleep(2000)
println("3: before arriveAndAwaitAdvance")
phaser.arriveAndAwaitAdvance()
println("3 is over")
}
executor.shutdown()
executor.awaitTermination(100, TimeUnit.DAYS)
}
/**
* 以下测试arriveAndDeregister()方法:
* 允许一个参与者到达一个目标点之后退出,同时这个Phaser的参与者数量减一.
* 以下指定Phaser的参与者数量是3。
* 开始时参与者线程是1,2,3,
* 线程3到达第一个目标之后退出,Phaser的参与者数量减为2,
* 然后线程4和5开始,可以通过日志看出线程5执行arriveAndAwaitAdvance之后立即执行线程5和线程4的arriveAndAwaitAdvance之后的代码,
* 说明Phaser的参与者数量在线程3调用arriveAndDeregister()方法之后已经减为2。
*/
@Test
fun tesArriveAndDeregister() {
val executor = Executors.newFixedThreadPool(5)
val phaser = Phaser(3)
executor.execute {
println("1")
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(2, phaser.registeredParties)
println("1 is over")
}
executor.execute {
println("2")
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(2, phaser.registeredParties)
println("2 is over")
}
executor.execute {
println("3")
Thread.sleep(2000)
println("3: before arriveAndAwaitAdvance")
Assert.assertEquals(3, phaser.registeredParties)
phaser.arriveAndDeregister()
Assert.assertEquals(2, phaser.registeredParties)
println("3 is over")
}
Thread.sleep(4000)
executor.execute {
println("4")
phaser.arriveAndAwaitAdvance();
println("4 is over")
}
executor.execute {
println("5")
Thread.sleep(1000)
println("5: before arriveAndAwaitAdvance")
phaser.arriveAndAwaitAdvance();
println("5 is over")
}
executor.shutdown()
executor.awaitTermination(100, TimeUnit.DAYS)
}
/**
* 以下测试Phase:
* Phase指这个Phaser已经跑了第几圈了。
* 我们可以设想在一个环形跑道,很多线程在这个跑道上跑,所有的参与者到达栅栏一次
* 那么这个Phaser的Phase值就加1,以此类推.
* 线程1和线程2以及线程3调用arriveAndAwaitAdvance()之后就是第一圈
* 线程4和线程5调用arriveAndAwaitAdvance()之后就是第二圈了,
* 所以phase是2。
*/
@Test
fun tesPhase() {
val executor = Executors.newFixedThreadPool(5)
val phaser = Phaser(3)
executor.execute {
println("1")
Assert.assertEquals(0, phaser.phase)
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(1, phaser.phase)
println("1 is over")
}
executor.execute {
println("2")
Assert.assertEquals(0, phaser.phase)
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(1, phaser.phase)
println("2 is over")
}
executor.execute {
println("3")
Thread.sleep(2000)
println("3: before arriveAndAwaitAdvance")
Assert.assertEquals(0, phaser.phase)
phaser.arriveAndDeregister()
Assert.assertEquals(1, phaser.phase)
println("3 is over")
}
Thread.sleep(4000)
executor.execute {
println("4")
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(2, phaser.phase)
println("4 is over")
}
executor.execute {
println("5")
Thread.sleep(1000)
println("5: before arriveAndAwaitAdvance")
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(2, phaser.phase)
println("5 is over")
}
executor.shutdown()
executor.awaitTermination(100, TimeUnit.DAYS)
}
/**
* 以下测试awaitAdvance():
* 依然基于上面的代码,增加了线程6,
* 线程6调用phaser.awaitAdvance(1),意思是等待phaser的phase值由1变为2,
* 如果当前phase的值不等于1立即返回.
* 也就是线程6在参与者线程全部跑完第一圈之后等待参与者线程全部跑完第二圈.
*/
@Test
fun testAwaitAdvance() {
val executor = Executors.newFixedThreadPool(5)
val phaser = Phaser(3)
executor.execute {
println("1")
Assert.assertEquals(0, phaser.phase)
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(1, phaser.phase)
println("1 is over")
}
executor.execute {
println("2")
Assert.assertEquals(0, phaser.phase)
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(1, phaser.phase)
println("2 is over")
}
executor.execute {
println("3")
Thread.sleep(2000)
println("3: before arriveAndAwaitAdvance")
Assert.assertEquals(0, phaser.phase)
phaser.arriveAndDeregister()
Assert.assertEquals(1, phaser.phase)
println("3 is over")
}
Thread.sleep(4000)
executor.execute {
println("4")
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(2, phaser.phase)
println("4 is over")
}
executor.execute {
println("5")
Thread.sleep(1000)
println("5: before arriveAndAwaitAdvance")
phaser.arriveAndAwaitAdvance();
Assert.assertEquals(2, phaser.phase)
println("5 is over")
}
executor.execute {
println("6")
Assert.assertEquals(1, phaser.phase)
phaser.awaitAdvance(1)
Assert.assertEquals(2, phaser.phase)
println("6 is over")
}
executor.shutdown()
executor.awaitTermination(100, TimeUnit.DAYS)
}
/**
* 以下测试register():
* 基于tesArriveAndDeregister()的代码,增加了线程6,
* Phaser.register()方法是增加一个参与者数量,现在数量是3.
* 那么线程4,线程5和线程6调用arriveAndAwaitAdvance之后就满足通行条件,
* 线程4,线程5和线程6继续执行arriveAndAwaitAdvance()之后的代码.
*/
@Test
fun testRegister() {
val executor = Executors.newFixedThreadPool(5)
val phaser = Phaser(3)
executor.execute {
println("1")
phaser.arriveAndAwaitAdvance();
println("1 is over")
}
executor.execute {
println("2")
phaser.arriveAndAwaitAdvance();
println("2 is over")
}
executor.execute {
println("3")
Thread.sleep(500)
println("3: before arriveAndAwaitAdvance")
phaser.arriveAndDeregister()
println("3 is over")
}
Thread.sleep(1000)
Assert.assertEquals(phaser.registeredParties, 2)
executor.execute {
Thread.sleep(1000)
println("4")
phaser.arriveAndAwaitAdvance();
println("4 is over")
}
executor.execute {
println("5")
Thread.sleep(1000)
println("5: before arriveAndAwaitAdvance")
phaser.arriveAndAwaitAdvance();
println("5 is over")
}
executor.execute {
println("6")
Assert.assertEquals(phaser.unarrivedParties, 2)
phaser.register()
Assert.assertEquals(phaser.unarrivedParties, 3)
phaser.arriveAndAwaitAdvance();
println("6 is over")
}
Thread.sleep(2000)
Assert.assertEquals(phaser.unarrivedParties, 3)
executor.shutdown()
executor.awaitTermination(100, TimeUnit.DAYS)
}
}