?
CountDownLatch
public class CountDownLatchTest {
private CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
private static int THREAD_COUNT = 3;
@Test
public void test() throws InterruptedException {
long startTime = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(()->{
if(Thread.currentThread().getName().equals("thread_1")){
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
},"thread_"+i).start();
}
countDownLatch.await();
long cost = System.currentTimeMillis() - startTime;
Assert.assertTrue(cost>2000);
}
}
CyslicBarrier
public class CyclicBarrierTest {
private CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_COUNT);
private static final int THREAD_COUNT = 3;
@Test
public void test(){
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(()->{
long startTime = System.currentTimeMillis();
try {
if(Thread.currentThread().getName().equals("thread_1")){
TimeUnit.SECONDS.sleep(2);
}
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
long cost = System.currentTimeMillis() - startTime;
Assert.assertTrue(cost>2000);
System.out.println(Thread.currentThread().getName()+"执行了 "+cost+"ms");
},"thread_"+i).start();
}
while(true){}
}
}
Exchanger
public class ExchangeTest {
Exchanger exchanger = new Exchanger();
@Test
public void test(){
Thread thread = new Thread(new M());
thread.start();
Thread thread1 = new Thread(new N());
thread1.start();
while(true){}
}
class M implements Runnable{
private String m_value = "m类的变量";
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
Object exchange = exchanger.exchange(m_value);
System.out.println("M类-交换后的值:"+exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class N implements Runnable{
private int n_value = 1;
@Override
public void run() {
try {
Object exchange = exchanger.exchange(n_value);
System.out.println("N类-交换后的值:"+exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ForkJoin
public class ForkJoinTest {
//处理的最大任务
private static final int MAX = 200;
static class MyForkJoinTask extends RecursiveTask<Integer>{
//子任务开始计算的元素
private Integer startValue;
//子任务结束计算的元素
private Integer endValue;
public MyForkJoinTask(Integer startValue, Integer endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
protected Integer compute() {
if(endValue-startValue < MAX){
System.out.println("开始计算的部分:startValue = " + startValue + ";endValue = " + endValue);
return addTotal(startValue,endValue);
}else{
MyForkJoinTask myForkJoinTask1 = new MyForkJoinTask(startValue, (startValue+endValue)/2);
myForkJoinTask1.fork();
MyForkJoinTask myForkJoinTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1, endValue);
myForkJoinTask2.fork();
return myForkJoinTask1.join()+myForkJoinTask2.join();
}
}
private Integer addTotal(Integer startValue, Integer endValue){
Integer total = 0;
for(int i = startValue; i<=endValue; i++){
total+=i;
}
return total;
}
}
@Test
public void test() throws Exception{
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> task = forkJoinPool.submit(new MyForkJoinTask(1, 1001));
Integer integer = task.get();
System.out.println(integer);
}
}
Semaphore
public class SemaphoreTest {
private static int PARALLEL_COUNT = 3; //并行数量
private static int THREAD_COUNT = 10 * PARALLEL_COUNT;//线程数量
private Semaphore semaphore = new Semaphore(PARALLEL_COUNT);
private CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
@Test
public void test() throws InterruptedException {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(()->{
try {
semaphore.acquire();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"开过");
semaphore.release();
countDownLatch.countDown();
if(countDownLatch.getCount()%PARALLEL_COUNT == 0){
System.out.println("-------------");//每三条打印一次
}
}, "车"+i).start();
}
countDownLatch.await();
System.out.println("---------所有车开过了-------");
}
}
Phaser
public class PhaserTest {
Phaser phaseMarry = new Phaser(6){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase){
case 0:
System.out.println(registeredParties+"个人到达现场");
return false;
case 1:
System.out.println(registeredParties+"个人用餐完毕");
return false;
case 2:
System.out.println(registeredParties+"个人开始洞房");
return false;
default:
return true;
}
}
};
@Test
public void test(){
int COUNT = 4;
for (int i = 0; i < COUNT; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"到达");
phaseMarry.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"吃饭");
phaseMarry.arriveAndAwaitAdvance();
phaseMarry.arriveAndDeregister();
},"嘉宾"+COUNT).start();
}
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"到达");
phaseMarry.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"吃饭");
phaseMarry.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"洞房");
phaseMarry.arriveAndAwaitAdvance();
},"新郎").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"到达");
phaseMarry.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"吃饭");
phaseMarry.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"洞房");
phaseMarry.arriveAndAwaitAdvance();
},"新娘").start();
}
}
?