Java多线程之Phaser使用说明

package com.abc.test

import org.junit.Assert
import org.junit.Test
import java.util.concurrent.*

/**

 */
class PhaserTest {

    /**
     * Phaser 除具有CyclicBarrier的所有功能外,还多加了以下功能:
     * 1 允许参与者到达一个目标点之后退出.
     * 2 允许线程只在某一轮等待.
     * 3 动态的增加参与者数量.
     *
     * 以下指定Phaser的参与者数量是3.
     * 运作之后可以得出结论:
     * 只有打印了“3: before arriveAndAwaitAdvance”  之后 ,
     * 线程1 和线程2 的  phaser.arriveAndAwaitAdvance() 之后的代码才执行.
     * 可以尝试修改线程3的sleep时间来看效果.
     */
    @Test
    fun tesArriveAndAwaitAdvance() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)
        executor.execute {
            println("1")
            phaser.arriveAndAwaitAdvance();
            println("1 is  over")
        }
        executor.execute {
            println("2")
            phaser.arriveAndAwaitAdvance();
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(2000)
            println("3: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance()
            println("3 is  over")
        }

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }

    /**
     * 以下测试arriveAndDeregister()方法:
     * 允许一个参与者到达一个目标点之后退出,同时这个Phaser的参与者数量减一.
     * 以下指定Phaser的参与者数量是3。
     * 开始时参与者线程是1,2,3,
     * 线程3到达第一个目标之后退出,Phaser的参与者数量减为2,
     * 然后线程4和5开始,可以通过日志看出线程5执行arriveAndAwaitAdvance之后立即执行线程5和线程4的arriveAndAwaitAdvance之后的代码,
     * 说明Phaser的参与者数量在线程3调用arriveAndDeregister()方法之后已经减为2。
     */
    @Test
    fun tesArriveAndDeregister() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)

        executor.execute {
            println("1")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.registeredParties)
            println("1 is  over")
        }
        executor.execute {
            println("2")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.registeredParties)
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(2000)
            println("3: before arriveAndAwaitAdvance")
            Assert.assertEquals(3, phaser.registeredParties)
            phaser.arriveAndDeregister()
            Assert.assertEquals(2, phaser.registeredParties)
            println("3 is  over")
        }

        Thread.sleep(4000)

        executor.execute {
            println("4")
            phaser.arriveAndAwaitAdvance();
            println("4 is  over")
        }
        executor.execute {
            println("5")
            Thread.sleep(1000)
            println("5: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance();
            println("5 is  over")
        }

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }

    /**
     * 以下测试Phase:
     * Phase指这个Phaser已经跑了第几圈了。
     * 我们可以设想在一个环形跑道,很多线程在这个跑道上跑,所有的参与者到达栅栏一次
     * 那么这个Phaser的Phase值就加1,以此类推.
     * 线程1和线程2以及线程3调用arriveAndAwaitAdvance()之后就是第一圈
     * 线程4和线程5调用arriveAndAwaitAdvance()之后就是第二圈了,
     * 所以phase是2。
     */
    @Test
    fun tesPhase() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)
        executor.execute {
            println("1")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(1, phaser.phase)
            println("1 is  over")
        }
        executor.execute {
            println("2")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(1, phaser.phase)
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(2000)
            println("3: before arriveAndAwaitAdvance")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndDeregister()
            Assert.assertEquals(1, phaser.phase)
            println("3 is  over")
        }

        Thread.sleep(4000)

        executor.execute {
            println("4")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.phase)
            println("4 is  over")
        }
        executor.execute {
            println("5")
            Thread.sleep(1000)
            println("5: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.phase)
            println("5 is  over")
        }

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }


    /**
     * 以下测试awaitAdvance():
     * 依然基于上面的代码,增加了线程6,
     * 线程6调用phaser.awaitAdvance(1),意思是等待phaser的phase值由1变为2,
     * 如果当前phase的值不等于1立即返回.
     * 也就是线程6在参与者线程全部跑完第一圈之后等待参与者线程全部跑完第二圈.
     */
    @Test
    fun testAwaitAdvance() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)
        executor.execute {
            println("1")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(1, phaser.phase)
            println("1 is  over")
        }
        executor.execute {
            println("2")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(1, phaser.phase)
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(2000)
            println("3: before arriveAndAwaitAdvance")
            Assert.assertEquals(0, phaser.phase)
            phaser.arriveAndDeregister()
            Assert.assertEquals(1, phaser.phase)
            println("3 is  over")
        }

        Thread.sleep(4000)

        executor.execute {
            println("4")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.phase)
            println("4 is  over")
        }
        executor.execute {
            println("5")
            Thread.sleep(1000)
            println("5: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance();
            Assert.assertEquals(2, phaser.phase)
            println("5 is  over")
        }
        executor.execute {
            println("6")
            Assert.assertEquals(1, phaser.phase)
            phaser.awaitAdvance(1)
            Assert.assertEquals(2, phaser.phase)
            println("6 is  over")
        }

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }

    /**
     * 以下测试register():
     * 基于tesArriveAndDeregister()的代码,增加了线程6,
     * Phaser.register()方法是增加一个参与者数量,现在数量是3.
     * 那么线程4,线程5和线程6调用arriveAndAwaitAdvance之后就满足通行条件,
     * 线程4,线程5和线程6继续执行arriveAndAwaitAdvance()之后的代码.
     */
    @Test
    fun testRegister() {
        val executor = Executors.newFixedThreadPool(5)
        val phaser = Phaser(3)
        executor.execute {
            println("1")
            phaser.arriveAndAwaitAdvance();
            println("1 is  over")
        }
        executor.execute {
            println("2")
            phaser.arriveAndAwaitAdvance();
            println("2 is  over")
        }
        executor.execute {
            println("3")
            Thread.sleep(500)
            println("3: before arriveAndAwaitAdvance")
            phaser.arriveAndDeregister()
            println("3 is  over")
        }

        Thread.sleep(1000)

        Assert.assertEquals(phaser.registeredParties, 2)

        executor.execute {
            Thread.sleep(1000)
            println("4")
            phaser.arriveAndAwaitAdvance();
            println("4 is  over")
        }
        executor.execute {
            println("5")
            Thread.sleep(1000)
            println("5: before arriveAndAwaitAdvance")
            phaser.arriveAndAwaitAdvance();
            println("5 is  over")
        }
        executor.execute {
            println("6")
            Assert.assertEquals(phaser.unarrivedParties, 2)
            phaser.register()
            Assert.assertEquals(phaser.unarrivedParties, 3)
            phaser.arriveAndAwaitAdvance();
            println("6 is  over")
        }

        Thread.sleep(2000)
        Assert.assertEquals(phaser.unarrivedParties, 3)

        executor.shutdown()
        executor.awaitTermination(100, TimeUnit.DAYS)
    }


}
上一篇:Java ArrayBlockingQueue 入门指南


下一篇:Java ★ 灵魂拷问:到底要不要写单元测试,如何正确进行单元测试?