下面介绍一些线程编程例子:
1.有一个任务(有三个阶段-前期准备,任务完成,后期检查),要求多个工作者参与共同完成,每个阶段必须所有的工作者完成后才可以进行下一个阶段,三个阶段都完成,总部(一个特殊的工作者)完成后期总结。
CyclicBarrier:故障点,可以重复使用---await
static class SummaryService { private Random random; SummaryService(Random random) { this.random = random; } void doService(String name, String phase) { System.out .println("-----------------------------------------------"); System.out.println("任务阶段:" + phase + ",当期工作者:" + name + ",正在汇总数据"); try { Thread.currentThread().sleep(random.nextInt(300)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务阶段:" + phase + ",当期工作者:" + name + ",汇总数据完成"); System.out .println("-----------------------------------------------"); } }
static class DataTask implements Runnable { private String name; private CyclicBarrier barrier; private SummaryService service; DataTask(String name, CyclicBarrier barrier, SummaryService service) { this.name = name; this.barrier = barrier; this.service = service; } @Override public void run() { service.doService(name, "前期任务准备"); System.out.println("体系单位:[" + name + "]汇总[前期任务准备都已完成],等待进入[完成任务阶段],未完成前期的单位还有:" + barrier.getNumberWaiting() + "个"); try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } service.doService(name, "完成任务"); System.out.println("体系单位:[" + name + "]汇总[完成任务工作都已完成],等待进入[监控任务阶段],未完成前期的单位还有:" + barrier.getNumberWaiting() + "个"); try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } service.doService(name, "监控任务阶段"); System.out.println("体系单位:[" + name + "]汇总[监控任务阶段都已完成]" + barrier.getNumberWaiting() + "个"); try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }
public static void main(String[] args) throws InterruptedException, IOException { CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { @Override public void run() { System.out.println("总部汇总工作!!!"); } }); SummaryService service = new SummaryService(new Random()); new Thread(new DataTask("A单位", barrier, service)).start(); new Thread(new DataTask("B单位", barrier, service)).start(); new Thread(new DataTask("C单位", barrier, service)).start(); }
2.CountDownLatch --await(等待所有的任务完成),countDown(某一个任务已经完成)-不可重复使用
static class CTask implements Runnable { private String name; private CountDownLatch countDownLatch; CTask(String name,CountDownLatch countDownLatch) { this.name = name; this.countDownLatch = countDownLatch; } @Override public void run() { System.out.println("工作者:" + name + "处理任务...."); countDownLatch.countDown(); } }
public static void main(String[] args) throws InterruptedException, IOException { final CountDownLatch countDownLatch = new CountDownLatch(3); new Thread(new CTask("小云", countDownLatch )).start(); new Thread(new CTask("小航", countDownLatch )).start(); new Thread(new CTask("小月", countDownLatch )).start(); countDownLatch.await(); System.out.println("任务都已完成!!"); }
3.exchanger-交换器(分片思想)
public static void main(String[] args) throws InterruptedException, IOException { final Exchanger exchanger = new Exchanger(); new Thread(new Runnable() { @Override public void run() { try { Thread.currentThread().sleep(1000); System.out.println("换出数据:A,换回的数据:" + exchanger.exchange("A") ); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { System.out.println("换出数据:B,换回的数据:" + exchanger.exchange("B") ); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }