多线程并发工具类

1.等待多线程完成的CountDownLatch

    CountDownLatch允许一个或多个线程等待其他线程完成操作。即他可以实现与join()方法相同的功能,而且比join的功能更多。可以在初始化CountDownLatch时传入一个int参数来设置初始计数值,任何在CountDownLatch对象上调用wait()的方法都将被阻塞,直到这个CountDownLatch对象的计数值为0。CountDownLatch被设计为只能触发一次,计数值不能被重置。

    当我们调用CountDownLatch的countDown方法时,计数值N就会减1,CountDownLatch的await方法 会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可。

    注意:计数器必须大于等于0,只是等于0时候,计数器就是零,则此时调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法发生之前,另外一个线程调用await方法。一个线程调用countDown方法并不会被阻塞,只有调用await()方法的线程才会被阻塞。

public class TestCountDownLatch {
	static CountDownLatch c=new CountDownLatch(8);
	public static void main(String[] args) throws InterruptedException {
		for(int i=1;i<=8;i++){
			Thread t=new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
						System.out.println(Thread.currentThread().getName()+"完成");
						c.countDown();//计数器减1
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
			t.start();
		}
		c.await();//主线程会在此处阻塞,直到CountDownLatch的计数器为0才会恢复
		System.out.println("完成所有准备任务");
		System.out.println("主程序开始执行");
	}
}

2.同步屏障CyclicBarrier

    CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

    CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)即可解除阻塞
				} catch (Exception e) {
				}
				System.out.println(1);
			}
		}).start();
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)即可解除阻塞
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}

    注意:如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),则主线程和子线程会永远等待, 因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

    CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,保证会优先执行barrierAction,方便处理更复杂的业务场景。

public class TestCyclicBarrier1 {
	static CyclicBarrier c = new CyclicBarrier(2, new A());
	public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
		new Thread(new Runnable() {
			@Override
			public void run() {
				// TODO Auto-generated method stub
				try {
					c.await();
					System.out.println(2);
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		}).start();
		c.await();
		System.out.println(1);
	}
	static class A implements Runnable {
		@Override
		public void run() {
			System.out.println(3);
		}
	}
}
/*
 * 输出结果:
 * 3
 * 2
 * 1
 */

    因为CyclicBarrier设置了拦截线程的数量是2,所以必须等代码中的第一个线程和线程A 都执行完之后,才会继续执行主线程,所以输出结果为3 2 1。那么此时有一个问题,如果阻塞的线程数大于CyclicBarrier的计数器会怎样?

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		for(int i=1;i<=3;i++){
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		}).start();
		}
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}

根据结果可以知道,CyclicBarrier可以自动重置计数器数量,当拦截线程数量为2时会把从阻塞队列中任意取出两个解除阻塞并执行,如果还有剩余的阻塞队列则会重置计数器,如果剩余阻塞队列数量小于计数器则会阻塞运行,也就是说,如果有阻塞队列数X与计数器N,X%N==0,那么所有线程都会执行,如果X%N!=0,那么会有部分线程处于阻塞状态无法执行。也可以手动调用 reset()方法来进行重置计数器。

    CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier 阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		Thread t=new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		});
		t.start();
		t.interrupt();
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
			System.out.println(2);
		} catch (BrokenBarrierException|InterruptedException e) {
			System.out.println(c.isBroken());
		}
		
	}

}
/*
true
*/

3.控制并发线程数的Semaphore

    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。换句话说,锁(Lock锁或synchronized锁)在任何时刻只允许一个任务访问被加锁的资源,而计数信号量允许n个任务同时访问这个资源,还可以将信号量看作是向外分发使用资源的“许可证”,尽管内部没有这种许可证对象。

    “许可证”的数量是有限的,所以当执有“许可证”的线程数量与“许可证”数量相同时,就会阻止其他线程对共享资源的使用,如果某一个或多个线程使用完共享资源后,就会归还“许可证”,此时Semaphore(信号量)就会将这些归还的“许可证”再次分发给阻塞中的线程。通过这种方式就实现了控制线程并发数。

    3.1 API

  • Semaphore(int permits):构造器,接受一个整型的数字,表示可用的许可证数量。
  • acquire():线程调用该方法获取一个许可证来获取使用共享资源的资格。
  • release():线程使用完共享资源之后调用方法归还许可证。
  • tryAcquire():线程调用该方法尝试获取许可证。
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方 法。

    3.2 应用

    Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程 并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这 时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连 接。这个时候,就可以使用Semaphore来做流量控制。

public class TestSemaphore {
	static Semaphore s=new Semaphore(10);
	static int threadNum=30;
	public static void main(String[] args) {
		for(int i=1;i<=30;i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						s.acquire();
						System.out.println(Thread.currentThread().getName()+"do some work");
						s.release();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
				}
			}).start();
		}
	}
}

4. 线程间交换数据的Exchanger

    Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。所以由此可见,Exchanger将会与 生产者-消费者模型相关。

    其应用场景有:Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需 要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行 录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致。如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发 生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。

public class TestExchanger {
	static Exchanger<String> ex=new Exchanger<String>();
	static ExecutorService service=Executors.newFixedThreadPool(2);
	public static void main(String[] args) {
		service.execute(new Runnable() {
			@Override
			public void run() {
				String a="A";
				try {
					ex.exchange(a);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.execute(new Runnable() {
			@Override
			public void run() {
				String b="B";
				try {
					String a=ex.exchange(b);
					System.out.println("a录入的是"+a);
					System.out.println("b录入的是"+b);
					System.out.println("a与b是否一致:"+a.equalsIgnoreCase(b));
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.shutdown();
	}
}

 

上一篇:深入学习Lock锁(5)——Condition接口应用与分析


下一篇:kvm虚拟化学习笔记(十七)之KVM到KVM之v2v迁移