java同步器

摘要:java提供了synchronized关键字对临界区进行线程同步访问。由于synchronized 很难正确的编写同步代码,并发工具类提供了高级的同步器(控制通用同步方法的类)

           本文主要介绍倒计时门闩(CountDownLatch)、同步屏障(cyclic barrier)、交换器(exchanger)、信号量(semaphore)以及phaser 同步器

1.倒计时门闩(CountDownLatch)

CountDownLatch核心思想:倒计时门栓会导致一条或者多条线程一直在门口等待,知道另一条线程打开这扇们,线程才得以继续运行。它是由一个计数变量和两个操作组成的,这两个操作分别是“导致一条线程一直等待直到计数变为0”以及“递减计数变量”

CountDownLatch核心方法:

          void await():除非线程中断,否则强制调用线程一直等到计数倒数为0

          boolean await(long timeout,TimeUnit unit):除非线程中断,否则一直强制调用线程一直等到计数倒数为0作者以unit作为timeout超时。

          void CountDown():递减计数,当计数降至0时,释放所有的等待线程,当已经为0 时,什么也不会发生。

          long getCount():返回当前的计数,用于测试和调试

          String toString();返回一条标识这个门闩及其状态的字符串。 

package multithreading;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo {
    final static int nThreads=3;
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		final CountDownLatch startFlag=new CountDownLatch(1);
		final CountDownLatch doneFlag=new CountDownLatch(nThreads);
		Runnable r=new Runnable() {
			
			@Override
			public void run() {
				// TODO Auto-generated method stub
					try {
						    report("entered run");
							startFlag.await();
							report("doing work!");
							Thread.sleep(2000);
							doneFlag.countDown();
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
						    e.printStackTrace();
					}
			}

				    void report(String s) {
						// TODO Auto-generated method stub
						System.out.println(System.currentTimeMillis()+""+Thread.currentThread()+":"+s);
					}
	    };
        ExecutorService executor=Executors.newFixedThreadPool(nThreads);
		for(int i=0;i<nThreads;i++) {
			executor.execute(r);
		};
		System.err.println("主线程开始执行");
			try {
				Thread.sleep(1000);
				startFlag.countDown();
				System.err.println("主线程do something else!");
				doneFlag.await();
				executor.shutdown();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}              //每次执行完任务执行1秒
			
		
	}
}

  运行结果:

java同步器

 

 

 注:startFlag门闩会在默认主线程结束之前禁止任何线程执行,endFlag线程会在默认3条主线程全部执行完。

2.同步屏障(CyclicBarrier)

  核心思想:同步屏障允许一组线程彼此相互等待,知道抵达某个公共的屏障点,因为该屏障在等待线程被释放之后可以重用,所以称它为可循环使用的屏障。

 使用场景:数量固定并且相互之间必须不时等待彼此的多线程应用。

 核心方法:int await():强调线程一直等待直到所有的parties都已经在同步屏障上调用await()方法。当调用线程自己或其他等待线程被中断、有线程在等待中超时或者有线程在同步屏障之上调用reset()方法,该调用线程就会停止等待。

                   如果调用线程在进入方法时设置过中断装袋或者等待时被中断,该方法就会抛出Interrupted Exception并且中断状态会被清除。

                   线程正在等待时该同步屏障被重置了(通过reset()方法),该方法就会抛出java.until.conrrent.BrokenBarrierException

                  调用线程是最后一条线程,并企稳构造函数提供了一个非空的barrierAction,这条线程就会允许其他线程继续执行之前率先执行的runnable。该方法就会返回调用线程到达索引,getParties()-1代表第一条到达的线程,0表示最后一条到达的线                        程。

                  int wait(long timeout,TimeUnit unit):除了让你指定调用线程愿意等待的时长之外,该方法等同于之前的方法。  该线程在等待中超时,该方法会抛出java.util.concurrent.TimeoutException

                  int getNumberWaiting():返回当前在同步屏幕上等待的线程数目,该方法用于调试和断言

                  int getParties();返回需要跨越同步屏障的线程数目。 

                  int  isBroken();当一条或者多条线程由于在同步屏障创建或长刺重置之后,中断或者超时打破同步屏障,或者有因为一个异常导致barrier action 失败,返回true,否则返回false

                  void reset():把同步屏障重置到器原始状态。如果此时任意的线程等待在这个同步屏障上,就会抛出一个BrokenBarrier.注意,在由于某些原因发生 的跳出操作之后进行重置是非常难以实现的。线程需要通过一一些其他方式同步并挑选一条线程进                                        行同步操作。最好给后续的使用创建一个新的同步屏障。

                 

package multithreading;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    /**
     * 使用同步屏障把一个任务分解成多个子任务
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        float [][] matrix=new float[3][3];
        int counter=0;
        for(int row=0;row<matrix.length;row++) {
            for(int col=0;col<matrix.length;col++) {
                matrix[row][col]=counter++;
            }
        }
        dump(matrix);
        System.out.println();
        //为每一行分别建立一个线程进行计算
        Solver solver=new Solver(matrix); 
        System.out.println(solver.flag);
        dump(matrix);
    }
    //打印矩阵
     static void dump(float[][] matrix) {
        // TODO Auto-generated method stub
        for(int row=0;row<matrix.length;row++) {
            for(int col=0;col<matrix.length;col++) {
                System.out.print(matrix[row][col]+"  ");
            }
            System.out.println();
        }
    }

}

 class Solver {
     float[][] data;
     CyclicBarrier barrier ;
     boolean flag=false;
     public Solver(float[][] matrix) {
        // TODO Auto-generated constructor stub
         data = matrix;
        //创建3条同步屏障,在屏障跨越的时候启动
         barrier = new CyclicBarrier(matrix.length, new Runnable() {

            @Override
            public void run() {
                new CyclicBarrierDemo();
                // TODO Auto-generated method stub
                CyclicBarrierDemo.dump(matrix);
            }
         });
         System.err.println("创建的线程条数"+matrix.length);
         for(int i=0;i<matrix.length;i++) {
             //启动线程
             new Thread(new Worker(barrier,matrix,i)).start(); 
             
             System.out.println("主线程正在等待...");
             
             System.out.println("主线程被唤醒...");
         }
    }


}
package multithreading;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Worker implements  Runnable{
    int myrow;
    boolean done=false;
    private float[][] data;
    CyclicBarrier barrier;

    public Worker(float[][] matrix, int i) {
        // TODO Auto-generated constructor stub
        
    }

    public Worker(CyclicBarrier barrier2, float[][] matrix, int i) {
        // TODO Auto-generated constructor stub
        myrow=i;
        data = matrix;
        barrier=barrier2;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        while(!done) {
            processRow();   //开始计算
            try {
                System.out.println("++++++++++屏障等待线程条数"+barrier.getNumberWaiting());
                barrier.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                return;
            } catch (BrokenBarrierException e) {
                // TODO Auto-generated catch block
                return;
            }
        }
    }
   
    //开始计算
    void processRow() {
        System.out.println("开始计算,行数为:"+myrow);
        for(int i=0;i<data.length;i++) {
            data[myrow][i]*=10;
            done=true;
        }
        System.err.println("第"+myrow+"行计算完毕");
    }
   
        
    
}

注:创建n条线程的同步屏障在跨越时启动

运行结果:

 java同步器

3.交换器

  核心思想:交换器提供了一个线程彼此之间能够交换对象的同步点。每条线程都会往这个交换器的exchange()方法中传入一些对象,匹配伙伴线程,同时接受伙伴线程中的对象作为返回值。在诸如遗传算法和管道设计的应用程序中,交换器会很有用。

  核心方法:

                V exchange(V,x) :在这个交互点等待其他线程到达(除非调用线程中断),之后将所有的对象方法传入其中,接收其他线程的对象 作为返回。如果其他交换线程已经在交换点等待,为了线程调度,它会从中恢复并且会接收调用线程锁传入的对                                                象,当前线程会立即返回,接收其他线程传入交换器中的对象。当调用线程被中断了该方法会抛出InterruptedException

               V exchange(V,x,long timeout,TimeUnit unit):除了让指定的调用线程愿意等待的时长之外,该方法等同于之前的方法。当线程在等待时,该方法会抛出TimeoutExeception

package Exchanger;

import java.util.concurrent.Exchanger;

public class ExechangeDemo {
    final static Exchanger<DataBuffer> exchanger=new Exchanger<DataBuffer>();
    final static DataBuffer initialEmptyBuffer=new DataBuffer();
    final static DataBuffer initialFullBuffer=new DataBuffer("I");
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        new Thread(new EmptyLoop(exchanger,initialFullBuffer)).start();
        new Thread(new FillingLoop(exchanger,initialEmptyBuffer)).start();
    } 

}
package Exchanger;

import java.util.ArrayList;
import java.util.List;

public class DataBuffer {
    private final static int maxitem=10;
    private final List<String> items=new ArrayList<>();
    
    public DataBuffer() {
        // TODO Auto-generated constructor stub
    }
    public DataBuffer(String prefix) {
        // TODO Auto-generated constructor stub
        for(int i=0;i<maxitem;i++) {
            String item=prefix+i;
            System.out.println("Adding"+item);
            items.add(item);
        }
    }
    //添加元素
    synchronized void add(String s) {
        if(!isFull()) {
            items.add(s);
        }
    }
    //缓冲区元素是否
    synchronized boolean isFull() {
        // TODO Auto-generated method stub
        return items.size()==maxitem;
    }
    synchronized String remove() {
        if(!isEmpty()) {
            return items.remove(0);
        }
        return null;
        
    }
    synchronized boolean isEmpty() {
        // TODO Auto-generated method stub
        return items.size()==0;
        
    }

}
package Exchanger;

import java.util.concurrent.Exchanger;

public class FillingLoop implements Runnable{
    int count=0;
    DataBuffer currentBuffer;
    final Exchanger<DataBuffer> exchanger;    
    public FillingLoop(Exchanger<DataBuffer> exchanger2, DataBuffer initialemptybuffer) {
        // TODO Auto-generated constructor stub
        currentBuffer=initialemptybuffer;
        exchanger=exchanger2;
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
        
      try {
         while(true) {
            addTobuffer(currentBuffer);
            System.err.println(currentBuffer.isFull());
            if(currentBuffer.isFull()) {
                  System.out.println("filling thread wants to exchange...");
                  currentBuffer=exchanger.exchange(currentBuffer);
                  System.err.println("filling thread recives exchange");
                } 
            }
        }catch (InterruptedException e) {
            // TODO Auto-generated catch block
             System.err.println("filling thread interrupt..");
        }
    }
    private void addTobuffer(DataBuffer buffer) {
        // TODO Auto-generated method stub
        String item="缓冲区填充->"+count++;
        System.out.println("Adding:"+item);
        buffer.add(item);
        
    }

}
package Exchanger;

import java.util.concurrent.Exchanger;

public class EmptyLoop implements Runnable{
    
    final Exchanger<DataBuffer> exchanger;
    DataBuffer currentBuffer;
    public EmptyLoop(Exchanger<DataBuffer> exchanger2, DataBuffer initialfullbuffer) {
        // TODO Auto-generated constructor stub
        currentBuffer=initialfullbuffer;
        exchanger=exchanger2;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
             while(true) {
               takeFromBuffer(currentBuffer);
               if(currentBuffer.isEmpty()) {
                    System.out.println("empty thread wants to exchange");
                    currentBuffer=exchanger.exchange(currentBuffer);
                } 
               System.err.println("emptying thread recives exchage");
            }
        }catch (InterruptedException e) {
            // TODO Auto-generated catch block
            System.err.println("emptying thread interrupted");
        }
    }

    void takeFromBuffer(DataBuffer buffer) {
        // TODO Auto-generated method stub
        System.out.println("缓冲区取数据->:"+buffer.remove());
    }
}

运行结果:

java同步器

 

 4.信号量

5.信号量和公平策略

 

 

java同步器

 

上一篇:list类里面的东西加锁 (手动加锁方法)


下一篇:Laravel 7.4 发布