J.U.C中的工具类及原理分析(CountDownLatch、Semaphore、CyclicBarrier)

前面文章讲述的Synchronized和ReentrantLock都是Java中的互斥锁,排它锁,其实在J.U.C中还有一种锁,那就是共享锁,让我们一起来看看吧.

什么是共享锁?

该锁可以被多个线程共有,不与其他线程互斥,非独占状态。
主要以CountDownLatch、Semaphore、CyclicBrrier来分析共享锁原理

1.CountDownLatch

计数器,通过wait/notify操作state

1.1.Java中的使用

//初始化计数器 3
static CountDownLatch latch = new CountDownLatch(3);

public static void main(String[] args) throws InterruptedException {
    new Thread(()->{
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("我是线程1,我执行完了");
        //减少计数器
        latch.countDown();
    }).start();
    new Thread(()->{
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("我是线程2,我执行完了");
        latch.countDown();
    }).start();
    new Thread(()->{
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("我是线程3,我执行完了");
        latch.countDown();
    }).start();
    //在这里阻塞 等待计数器归0后被唤醒
    latch.await();
    System.out.println("所有线程都执行完了!");
}

1.2.源码分析

  • 构造方法和 latch.await(); 主线程等待其他结束
//static CountDownLatch latch = new CountDownLatch(3);
Sync(int count) {
    //通过构造方法 实际上state=3
    setState(count);
}
//latch.await();
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //判断共享锁的状态 (getState() == 0) ? 1 : -1;
    if (tryAcquireShared(arg) < 0)
        //尝试去获取共享锁
        doAcquireSharedInterruptibly(arg);
}
//抢占锁并处理interrupt
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //当前线程添加到AQS队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
        	//获取前置节点
            final Node p = node.predecessor();
            //前置节点 = head节点的话 当前线程排在第一位 直接尝试获取
            if (p == head) {
                //尝试获取共享锁 (getState() == 0) ? 1 : -1;
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //如果next节点是共享并且Node.SIGNAL等待唤醒状态 这里都会去尝试唤醒
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //抢占失败的走这里 判断一些条件 并阻塞线程 
            if (shouldParkAfterFailedAcquire(p, node) &&
                //LockSupport.park(this); 阻塞并检查interrupt 
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • latch.countDown(); state == 0 则唤醒等待中的主线程
public final boolean releaseShared(int arg) {//arg = 1
    //释放一次锁
    if (tryReleaseShared(arg)) {
        //唤醒节点中的等待线程 也就是主线程
        //也就是AbstractQueuedSynchronizer#unparkSuccessor中的LockSupport.unpark(s.thread);
        doReleaseShared();
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        //实际上就是操作state 
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        //原始值为计数器的值 每次减一直到0才返回true唤醒主线程
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

2.Semaphore

本质:

  • 抢占一个令牌,如果抢到就通行,否则就阻塞
  • 限制资源的并发数

实际应用:信号灯、限流、限制资源的访问

2.1 Java中的使用

static class Elevator extends Thread {
    int num;
    Semaphore semaphore;
    //电梯可同时容纳人数
    public Elevator(int num, Semaphore semaphore) {
        this.num = num;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            //令牌数-1,如果为0就阻塞 可以阻塞多个
            semaphore.acquire();
            num++;
            System.out.println("第" + num + "进入电梯!");
            Thread.sleep(1000);
            System.out.println("第" + num + "人下了");
        } catch (Exception e) {

        } finally {
            //令牌数+1,唤醒等待的线程 可以唤醒多个
            semaphore.release();
        }
    }
}
public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(10);
    for (int i = 0; i < 20; i++) {
        new Elevator(i,semaphore).start();
    }
}

原理和计数器差不多,区别就在于多了release加令牌数

	protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                //令牌数会加回去
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

3.CyclicBarrier

不是AQS实现,可重复实现的栅栏,需要达到指定数量线程才会被唤醒,比如投票,需要等所有票数收集完才会执行统计

3.1 Java中的使用

public static void main(String[] args) {
    CyclicBarrier barrier = new CyclicBarrier(5,()->{
            System.out.println("投票完成,开始统计!");
        });
    for (int i = 0; i < 5; i++) {
        new Toupiao(barrier).start();
    }
}

static class Toupiao extends Thread{
    private CyclicBarrier cyclicBarrier;
    public Toupiao(CyclicBarrier cyclicBarrier){
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+"投票了");
            //与CountDownLatch有异曲同工的之妙
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.2 源码分析

  • await()
//构造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        //所有线程执行完以后 会执行这个线程
        this.barrierCommand = barrierAction;
    }

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    //加锁           
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
		//count 每次减1,直到为0就唤醒所有的线程
        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                	//同步执行构造方法传入的统计线程代码块
                    command.run();
                ranAction = true;
                //跟进去就是trip.signalAll(); 唤醒所有的线程
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                //上面不满足的话 会在这里阻塞
                if (!timed)
                    //trip Condition trip = lock.newCondition();
                 	//使用的Condition 就不再重复了 上面signalAll唤醒所有线程
                    trip.await();
                else if (nanos > 0L)//带超时时间的
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            。。。
        }
    } finally {
        lock.unlock();
    }
}

以上就是本章的全部内容了。重点是自己根据不同的需求去使用

上一篇:线程通信synchronized中的wait/notify、J.U.C Condition的使用和源码分析
下一篇:Java本地线程变量ThreadLocal的神秘面纱

知之者不如好之者,好之者不如乐之者

上一篇:HTTP -> Asp.net(第一篇)


下一篇:通俗易懂讲 CompletableFuture