J.U.C 并发工具
aqs共享锁应用
1,CountDownLatch
计数器倒计时
方法:
await
countdown
说明:1,static CountDownLatch countDownLatch = new CountDownLatch(3) 表示控制三个线程的操作
2,main线程里面调用countDownLatch.await() 阻塞 ,每次线程调用countDown() 方法 的时候,计数器-1 当计数器 = 0 时 唤醒main线程
代码示例
public class CountDownLatchDemo { static CountDownLatch countDownLatch = new CountDownLatch( 3 ); public static void main(String[] args) { Thread1 t1 = new Thread1(); t1.start(); Thread2 t2 = new Thread2(); t2.start(); Thread3 t3 = new Thread3(); t3.start(); try { //该处阻塞主线程 等到各自的线程做完了自己的事情 //当3个线程全部执行完成 countDownLatch = 0 唤醒main线程 countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } static class Thread1 extends Thread { @Override public void run() { //todo sth 做自己的事情 做完后给个通知 调用countDownLatch.countDown() countDownLatch.countDown(); } } static class Thread2 extends Thread { @Override public void run() { //todo sth 做自己的事情 做完后给个通知 调用countDownLatch.countDown() countDownLatch.countDown(); } } static class Thread3 extends Thread { @Override public void run() { //todo sth 做自己的事情 做完后给个通知 调用countDownLatch.countDown() countDownLatch.countDown(); } } } |
实际应用:异步通信 ----远程通信 没有返回等待 ;启动应用时候,三方应用检测
CountDownLatch有两个主要的方法,一个是await,用于在不满足条件时挂起当前的线程,一个是countDown,表示要满足的条件发生了一次,如果countDown调用的次数大于等于在创建CountDownLatch时指定的次数,则await上阻塞的线程将被全部唤醒。CountDownLatch也是使用的aqs来实现的,在创建时就要指定一个数字,表示要调用countDown的次数,其实就是aqs的state标记,他的实现的原理是每调用一次countDown方法,state标记就减1,直到变为0 ,在state标记不为0的时候调用await的线程将进入aqs的队列中等待,即此时锁不能获得。在标记为0之后,将唤醒所有在aqs中等待的线程,即此时锁可以获得,这里的锁时共享锁,也就是ReadWriterLock类似的锁,可以同时被多个线程获得,即多个等待锁的线程在state标记变为0之后,同时获得锁,也就是同时开始运行(稍后就会发现并不是严格的同时运行的)。
可以让一个线程阻塞 正常使用
也可以让多个线程阻塞(countDownLatch.await countDown() 逆向使用-- 线程并发 先阻塞线程 await 然后调用main 线程countDown(1) 类似CyclicBarrier
实现原理:共享锁 当计数器=0的时候允许处于队列里面的所有节点线程同时抢占到锁
可以允许多个线程同时抢占到锁,然后等到计数器归零 同时唤醒所有线程
猜想:1,state 计数器 只减不增 2,线程调用countDowan() 方法的时候 state-1
源码
countDownLatch.awati() 获取共享锁
public void await() throws InterruptedException { sync.acquireSharedInterruptibly( 1 ); } public final void acquireSharedInterruptibly( int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); 尝试获得共享锁,其实是检查是否state标记是0,如果是0则返回正数,表示获得了锁,否则返回负数,要进入aqs的队列中排队等待锁。 if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); //排队等待 } protected int tryAcquireShared( int acquires) { return (getState() == 0 ) ? 1 : - 1 ; } private void doAcquireSharedInterruptibly( int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); //r = 1(计数器=0) 或者 -1 if (r >= 0 ) { //如果获得锁,则设置为head,并propagate,也就是唤醒队列中所有的线程,因为这个锁是共享锁,可以多个线程同时持有。 setHeadAndPropagate(node, r); p.next = null ; // help GC failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } |
countDownLatch.countDown() 当state==0 时释放共享锁 唤醒所有处于阻塞的线程
public void countDown() { sync.releaseShared( 1 ); } public final boolean releaseShared( int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared( int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c- 1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; // loop to recheck cases //唤醒head的successor,也就是head的下一个节点,这个地方很重要,因为上面的假设是有一个节点获得了锁,就会进入到这个阶段,而这个阶段就会唤醒下一个节点, //下一个节点在doAcquireSharedInterruptibly方法中可以看出,又会进入到这个循环中,也就是只要 //有一个线程被唤醒了,就会唤醒所有的线程(也可以看出这个aqs是共享锁,即可以被多个线程同时持有),同时时间的先后也可以看出,并不是严格意义上的同时的,而是先唤醒第一个等待的线程。 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; // loop on failed CAS } if (h == head) // loop if head changed break ; } } |
个人感觉一篇不错的分析文章 juc - CountDownLatch源码解读_iteye_14612的博客-CSDN博客
2,Semaphore
信号灯
线程级别的限流器:限制资源的访问
类似抢占一个令牌,如果抢占到就同行 否则等待(阻塞)
方法:
acquire(N) 可以同时抢占多个令牌哦 N>=1
release 释放一个令牌
示例:车位抢占 车位数是固定的,比如10 资源数是固定的来了20辆车 只能有10辆车能够进去,只能允许同时停放10辆车 超过10,另外=车需要排队阻塞
public class SemaphoreDemo { public static void main(String[] args) { //限制资源的访问并发数 Semaphore semaphore = new Semaphore( 10 ); for ( int i= 0 ;i< 20 ;i++) { new Car(i,semaphore).start(); } } static class Car extends Thread { private int num; private Semaphore semaphore; public Car( int num, Semaphore semaphore) { this .num = num; this .semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); //获得一个令牌 System.out.println( "第" + num + "辆车抢到一个车位" ); TimeUnit.SECONDS.sleep( 2 ); System.out.println( "第" + num + "辆车走喽~" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } } } |
猜想:
- state初始为一正整数
- acquire()–> 每次调用-1 若state小于0,挂起
- release()–>释放许可,若有被挂起的线程,则唤醒线程
源码分析
aquire() state-1
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly( 1 ); } public final void acquireSharedInterruptibly( int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync( int permits) { super (permits); } protected int tryAcquireShared( int acquires) { return nonfairTryAcquireShared(acquires); } } final int nonfairTryAcquireShared( int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } private void doAcquireSharedInterruptibly( int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; // help GC failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } |
release state +1
public void release() { sync.releaseShared( 1 ); } public final boolean releaseShared( int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared( int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error( "Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; // loop on failed CAS } if (h == head) // loop if head changed break ; } } |
Semaphore为什么要用共享锁?
因为可以同时释放多个令牌 意味着可以同时有多个线程可以同时抢到锁
同时允许多个线程抢到锁 注意不是同时抢占到同一把锁
重新理解互斥锁 重入锁 共享锁
3,CyclicBarrier -注意他不是使用AQS实现的
可重复的栅栏
实现上相当于多个线程通过CountDownLatch .await()方法阻塞 然后另一线程通过countDown()方法来唤醒
与countDownLatch 的区别
CyclicBarrier不需要发令枪 (调用countDown方法)
public class CyclicBarrierDemo { public static void main(String[] args) { int n = 4 ; CyclicBarrier cyclicBarrier = new CyclicBarrier(n); for ( int i = 0 ; i < n; i++) { new Writer(cyclicBarrier).start(); } } static class Writer extends Thread { private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this .cyclicBarrier = cyclicBarrier; } @Override public void run() { try { TimeUnit.SECONDS.sleep( 1 ); System.out.println(Thread.currentThread().getName() + "线程写入数据完毕,等待其他线程继续处理" ); cyclicBarrier.await(); //state-1 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.printf( "所有线程都写入完成,继续处理其他任务" ); //该代码句 可以放在回调里面 } } } 这对以上代码的优化点 所有的线程执行完毕可以回调 CyclicBarrier cyclicBarrier = new CyclicBarrier(n, ()->{ System.out.printf( "所有线程都写入完成,继续处理其他任务" ); }); |
源码分析:使用到了ReentrantLock Condition
没有直接使用AQS
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait( false , 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait( boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0 ) { // tripped boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } |
4,Exchanger