摘要:java提供了synchronized关键字对临界区进行线程同步访问。由于synchronized 很难正确的编写同步代码,并发工具类提供了高级的同步器(控制通用同步方法的类)
本文主要介绍倒计时门闩(CountDownLatch)、同步屏障(cyclic barrier)、交换器(exchanger)、信号量(semaphore)以及phaser 同步器。
1.倒计时门闩(CountDownLatch)
CountDownLatch核心思想:倒计时门栓会导致一条或者多条线程一直在门口等待,知道另一条线程打开这扇们,线程才得以继续运行。它是由一个计数变量和两个操作组成的,这两个操作分别是“导致一条线程一直等待直到计数变为0”以及“递减计数变量”
CountDownLatch核心方法:
void await():除非线程中断,否则强制调用线程一直等到计数倒数为0
boolean await(long timeout,TimeUnit unit):除非线程中断,否则一直强制调用线程一直等到计数倒数为0作者以unit作为timeout超时。
void CountDown():递减计数,当计数降至0时,释放所有的等待线程,当已经为0 时,什么也不会发生。
long getCount():返回当前的计数,用于测试和调试
String toString();返回一条标识这个门闩及其状态的字符串。
package multithreading; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountDownLatchDemo { final static int nThreads=3; public static void main(String[] args) { // TODO Auto-generated method stub final CountDownLatch startFlag=new CountDownLatch(1); final CountDownLatch doneFlag=new CountDownLatch(nThreads); Runnable r=new Runnable() { @Override public void run() { // TODO Auto-generated method stub try { report("entered run"); startFlag.await(); report("doing work!"); Thread.sleep(2000); doneFlag.countDown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } void report(String s) { // TODO Auto-generated method stub System.out.println(System.currentTimeMillis()+""+Thread.currentThread()+":"+s); } }; ExecutorService executor=Executors.newFixedThreadPool(nThreads); for(int i=0;i<nThreads;i++) { executor.execute(r); }; System.err.println("主线程开始执行"); try { Thread.sleep(1000); startFlag.countDown(); System.err.println("主线程do something else!"); doneFlag.await(); executor.shutdown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //每次执行完任务执行1秒 } }
运行结果:
注:startFlag门闩会在默认主线程结束之前禁止任何线程执行,endFlag线程会在默认3条主线程全部执行完。
2.同步屏障(CyclicBarrier)
核心思想:同步屏障允许一组线程彼此相互等待,知道抵达某个公共的屏障点,因为该屏障在等待线程被释放之后可以重用,所以称它为可循环使用的屏障。
使用场景:数量固定并且相互之间必须不时等待彼此的多线程应用。
核心方法:int await():强调线程一直等待直到所有的parties都已经在同步屏障上调用await()方法。当调用线程自己或其他等待线程被中断、有线程在等待中超时或者有线程在同步屏障之上调用reset()方法,该调用线程就会停止等待。
如果调用线程在进入方法时设置过中断装袋或者等待时被中断,该方法就会抛出Interrupted Exception并且中断状态会被清除。
线程正在等待时该同步屏障被重置了(通过reset()方法),该方法就会抛出java.until.conrrent.BrokenBarrierException
调用线程是最后一条线程,并企稳构造函数提供了一个非空的barrierAction,这条线程就会允许其他线程继续执行之前率先执行的runnable。该方法就会返回调用线程到达索引,getParties()-1代表第一条到达的线程,0表示最后一条到达的线 程。
int wait(long timeout,TimeUnit unit):除了让你指定调用线程愿意等待的时长之外,该方法等同于之前的方法。 该线程在等待中超时,该方法会抛出java.util.concurrent.TimeoutException
int getNumberWaiting():返回当前在同步屏幕上等待的线程数目,该方法用于调试和断言
int getParties();返回需要跨越同步屏障的线程数目。
int isBroken();当一条或者多条线程由于在同步屏障创建或长刺重置之后,中断或者超时打破同步屏障,或者有因为一个异常导致barrier action 失败,返回true,否则返回false
void reset():把同步屏障重置到器原始状态。如果此时任意的线程等待在这个同步屏障上,就会抛出一个BrokenBarrier.注意,在由于某些原因发生 的跳出操作之后进行重置是非常难以实现的。线程需要通过一一些其他方式同步并挑选一条线程进 行同步操作。最好给后续的使用创建一个新的同步屏障。
package multithreading; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { /** * 使用同步屏障把一个任务分解成多个子任务 * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub float [][] matrix=new float[3][3]; int counter=0; for(int row=0;row<matrix.length;row++) { for(int col=0;col<matrix.length;col++) { matrix[row][col]=counter++; } } dump(matrix); System.out.println(); //为每一行分别建立一个线程进行计算 Solver solver=new Solver(matrix); System.out.println(solver.flag); dump(matrix); } //打印矩阵 static void dump(float[][] matrix) { // TODO Auto-generated method stub for(int row=0;row<matrix.length;row++) { for(int col=0;col<matrix.length;col++) { System.out.print(matrix[row][col]+" "); } System.out.println(); } } } class Solver { float[][] data; CyclicBarrier barrier ; boolean flag=false; public Solver(float[][] matrix) { // TODO Auto-generated constructor stub data = matrix; //创建3条同步屏障,在屏障跨越的时候启动 barrier = new CyclicBarrier(matrix.length, new Runnable() { @Override public void run() { new CyclicBarrierDemo(); // TODO Auto-generated method stub CyclicBarrierDemo.dump(matrix); } }); System.err.println("创建的线程条数"+matrix.length); for(int i=0;i<matrix.length;i++) { //启动线程 new Thread(new Worker(barrier,matrix,i)).start(); System.out.println("主线程正在等待..."); System.out.println("主线程被唤醒..."); } } }
package multithreading; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class Worker implements Runnable{ int myrow; boolean done=false; private float[][] data; CyclicBarrier barrier; public Worker(float[][] matrix, int i) { // TODO Auto-generated constructor stub } public Worker(CyclicBarrier barrier2, float[][] matrix, int i) { // TODO Auto-generated constructor stub myrow=i; data = matrix; barrier=barrier2; } @Override public void run() { // TODO Auto-generated method stub while(!done) { processRow(); //开始计算 try { System.out.println("++++++++++屏障等待线程条数"+barrier.getNumberWaiting()); barrier.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block return; } catch (BrokenBarrierException e) { // TODO Auto-generated catch block return; } } } //开始计算 void processRow() { System.out.println("开始计算,行数为:"+myrow); for(int i=0;i<data.length;i++) { data[myrow][i]*=10; done=true; } System.err.println("第"+myrow+"行计算完毕"); } }
注:创建n条线程的同步屏障在跨越时启动
运行结果:
3.交换器
核心思想:交换器提供了一个线程彼此之间能够交换对象的同步点。每条线程都会往这个交换器的exchange()方法中传入一些对象,匹配伙伴线程,同时接受伙伴线程中的对象作为返回值。在诸如遗传算法和管道设计的应用程序中,交换器会很有用。
核心方法:
V exchange(V,x) :在这个交互点等待其他线程到达(除非调用线程中断),之后将所有的对象方法传入其中,接收其他线程的对象 作为返回。如果其他交换线程已经在交换点等待,为了线程调度,它会从中恢复并且会接收调用线程锁传入的对 象,当前线程会立即返回,接收其他线程传入交换器中的对象。当调用线程被中断了该方法会抛出InterruptedException
V exchange(V,x,long timeout,TimeUnit unit):除了让指定的调用线程愿意等待的时长之外,该方法等同于之前的方法。当线程在等待时,该方法会抛出TimeoutExeception
package Exchanger; import java.util.concurrent.Exchanger; public class ExechangeDemo { final static Exchanger<DataBuffer> exchanger=new Exchanger<DataBuffer>(); final static DataBuffer initialEmptyBuffer=new DataBuffer(); final static DataBuffer initialFullBuffer=new DataBuffer("I"); public static void main(String[] args) { // TODO Auto-generated method stub new Thread(new EmptyLoop(exchanger,initialFullBuffer)).start(); new Thread(new FillingLoop(exchanger,initialEmptyBuffer)).start(); } }
package Exchanger; import java.util.ArrayList; import java.util.List; public class DataBuffer { private final static int maxitem=10; private final List<String> items=new ArrayList<>(); public DataBuffer() { // TODO Auto-generated constructor stub } public DataBuffer(String prefix) { // TODO Auto-generated constructor stub for(int i=0;i<maxitem;i++) { String item=prefix+i; System.out.println("Adding"+item); items.add(item); } } //添加元素 synchronized void add(String s) { if(!isFull()) { items.add(s); } } //缓冲区元素是否 synchronized boolean isFull() { // TODO Auto-generated method stub return items.size()==maxitem; } synchronized String remove() { if(!isEmpty()) { return items.remove(0); } return null; } synchronized boolean isEmpty() { // TODO Auto-generated method stub return items.size()==0; } }
package Exchanger; import java.util.concurrent.Exchanger; public class FillingLoop implements Runnable{ int count=0; DataBuffer currentBuffer; final Exchanger<DataBuffer> exchanger; public FillingLoop(Exchanger<DataBuffer> exchanger2, DataBuffer initialemptybuffer) { // TODO Auto-generated constructor stub currentBuffer=initialemptybuffer; exchanger=exchanger2; } @Override public void run() { // TODO Auto-generated method stub try { while(true) { addTobuffer(currentBuffer); System.err.println(currentBuffer.isFull()); if(currentBuffer.isFull()) { System.out.println("filling thread wants to exchange..."); currentBuffer=exchanger.exchange(currentBuffer); System.err.println("filling thread recives exchange"); } } }catch (InterruptedException e) { // TODO Auto-generated catch block System.err.println("filling thread interrupt.."); } } private void addTobuffer(DataBuffer buffer) { // TODO Auto-generated method stub String item="缓冲区填充->"+count++; System.out.println("Adding:"+item); buffer.add(item); } }
package Exchanger; import java.util.concurrent.Exchanger; public class EmptyLoop implements Runnable{ final Exchanger<DataBuffer> exchanger; DataBuffer currentBuffer; public EmptyLoop(Exchanger<DataBuffer> exchanger2, DataBuffer initialfullbuffer) { // TODO Auto-generated constructor stub currentBuffer=initialfullbuffer; exchanger=exchanger2; } @Override public void run() { // TODO Auto-generated method stub try { while(true) { takeFromBuffer(currentBuffer); if(currentBuffer.isEmpty()) { System.out.println("empty thread wants to exchange"); currentBuffer=exchanger.exchange(currentBuffer); } System.err.println("emptying thread recives exchage"); } }catch (InterruptedException e) { // TODO Auto-generated catch block System.err.println("emptying thread interrupted"); } } void takeFromBuffer(DataBuffer buffer) { // TODO Auto-generated method stub System.out.println("缓冲区取数据->:"+buffer.remove()); } }
运行结果:
4.信号量
5.信号量和公平策略