JUC 并发工具

J.U.C 并发工具

aqs共享锁应用

1,CountDownLatch

计数器倒计时

方法:

await

countdown

JUC 并发工具

 

说明:1,static CountDownLatch countDownLatch = new CountDownLatch(3)  表示控制三个线程的操作

2,main线程里面调用countDownLatch.await()  阻塞 ,每次线程调用countDown() 方法 的时候,计数器-1 当计数器 = 0 时 唤醒main线程

代码示例

public class CountDownLatchDemo {

    static CountDownLatch countDownLatch = new CountDownLatch(3);

    public static void main(String[] args) {

        Thread1 t1 = new Thread1();

        t1.start();

        Thread2 t2 = new Thread2();

        t2.start();

        Thread3 t3 = new Thread3();

        t3.start();

        try {

            //该处阻塞主线程  等到各自的线程做完了自己的事情

            //当3个线程全部执行完成 countDownLatch = 0  唤醒main线程

            countDownLatch.await();

        catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

    static class Thread1 extends Thread {

        @Override

        public void run() {

            //todo sth 做自己的事情 做完后给个通知 调用countDownLatch.countDown()

            countDownLatch.countDown();

        }

    }

    static class Thread2 extends Thread {

        @Override

        public void run() {

            //todo sth 做自己的事情 做完后给个通知 调用countDownLatch.countDown()

            countDownLatch.countDown();

        }

    }

        static class Thread3 extends Thread {

            @Override

            public void run() {

                //todo sth 做自己的事情 做完后给个通知 调用countDownLatch.countDown()

                countDownLatch.countDown();

            }

        }

}

实际应用:异步通信 ----远程通信 没有返回等待 ;启动应用时候,三方应用检测

CountDownLatch有两个主要的方法,一个是await,用于在不满足条件时挂起当前的线程,一个是countDown,表示要满足的条件发生了一次,如果countDown调用的次数大于等于在创建CountDownLatch时指定的次数,则await上阻塞的线程将被全部唤醒。CountDownLatch也是使用的aqs来实现的,在创建时就要指定一个数字,表示要调用countDown的次数,其实就是aqs的state标记,他的实现的原理是每调用一次countDown方法,state标记就减1,直到变为0 ,在state标记不为0的时候调用await的线程将进入aqs的队列中等待,即此时锁不能获得。在标记为0之后,将唤醒所有在aqs中等待的线程,即此时锁可以获得,这里的锁时共享锁,也就是ReadWriterLock类似的锁,可以同时被多个线程获得,即多个等待锁的线程在state标记变为0之后,同时获得锁,也就是同时开始运行(稍后就会发现并不是严格的同时运行的)。
 

可以让一个线程阻塞  正常使用

也可以让多个线程阻塞(countDownLatch.await   countDown()  逆向使用-- 线程并发  先阻塞线程  await   然后调用main 线程countDown(1)  类似CyclicBarrier

实现原理:共享锁   当计数器=0的时候允许处于队列里面的所有节点线程同时抢占到锁

可以允许多个线程同时抢占到锁,然后等到计数器归零 同时唤醒所有线程

猜想:1,state 计数器 只减不增   2,线程调用countDowan() 方法的时候  state-1

JUC 并发工具

 

源码

countDownLatch.awati()  获取共享锁

public void await() throws InterruptedException {

    sync.acquireSharedInterruptibly(1);

}

public final void acquireSharedInterruptibly(int arg)

        throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

        尝试获得共享锁,其实是检查是否state标记是0,如果是0则返回正数,表示获得了锁,否则返回负数,要进入aqs的队列中排队等待锁。

    if (tryAcquireShared(arg) < 0)

        doAcquireSharedInterruptibly(arg);//排队等待

}

protected int tryAcquireShared(int acquires) {

    return (getState() == 0) ? 1 : -1;

}

private void doAcquireSharedInterruptibly(int arg)

    throws InterruptedException {

    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;

    try {

        for (;;) {

            final Node p = node.predecessor();

            if (p == head) {

                int r = tryAcquireShared(arg);  //r = 1(计数器=0)  或者 -1

                if (r >= 0) {

                    //如果获得锁,则设置为head,并propagate,也就是唤醒队列中所有的线程,因为这个锁是共享锁,可以多个线程同时持有。

                    setHeadAndPropagate(node, r);

                    p.next = null// help GC

                    failed = false;

                    return;

                }

            }

            if (shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())

                throw new InterruptedException();

        }

    finally {

        if (failed)

            cancelAcquire(node);

    }

}

private void setHeadAndPropagate(Node node, int propagate) {

    Node h = head; // Record old head for check below

    setHead(node);

     

    if (propagate > 0 || h == null || h.waitStatus < 0 ||

        (h = head) == null || h.waitStatus < 0) {

        Node s = node.next;

        if (s == null || s.isShared())

            doReleaseShared();

    }

}

countDownLatch.countDown()  当state==0 时释放共享锁  唤醒所有处于阻塞的线程

public void countDown() {

    sync.releaseShared(1);

}

public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        doReleaseShared();

        return true;

    }

    return false;

}

protected boolean tryReleaseShared(int releases) {

    // Decrement count; signal when transition to zero

    for (;;) {

        int c = getState();

        if (c == 0)

            return false;

        int nextc = c-1;

        if (compareAndSetState(c, nextc))

            return nextc == 0;

    }

}

private void doReleaseShared() {

     

    for (;;) {

        Node h = head;

        if (h != null && h != tail) {

            int ws = h.waitStatus;

            if (ws == Node.SIGNAL) {

                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

                    continue;            // loop to recheck cases

            //唤醒head的successor,也就是head的下一个节点,这个地方很重要,因为上面的假设是有一个节点获得了锁,就会进入到这个阶段,而这个阶段就会唤醒下一个节点,

            //下一个节点在doAcquireSharedInterruptibly方法中可以看出,又会进入到这个循环中,也就是只要

            //有一个线程被唤醒了,就会唤醒所有的线程(也可以看出这个aqs是共享锁,即可以被多个线程同时持有),同时时间的先后也可以看出,并不是严格意义上的同时的,而是先唤醒第一个等待的线程。

                unparkSuccessor(h);

            }

            else if (ws == 0 &&

                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

                continue;                // loop on failed CAS

        }

        if (h == head)                   // loop if head changed

            break;

    }

}

个人感觉一篇不错的分析文章 juc - CountDownLatch源码解读_iteye_14612的博客-CSDN博客

2,Semaphore

信号灯 

线程级别的限流器:限制资源的访问

类似抢占一个令牌,如果抢占到就同行 否则等待(阻塞)

方法:

acquire(N)  可以同时抢占多个令牌哦   N>=1

release 释放一个令牌

示例:车位抢占  车位数是固定的,比如10  资源数是固定的来了20辆车 只能有10辆车能够进去,只能允许同时停放10辆车  超过10,另外=车需要排队阻塞

public class SemaphoreDemo {

    public static void main(String[] args) {

        //限制资源的访问并发数

        Semaphore semaphore = new Semaphore(10);

        for (int i=0;i<20;i++) {

            new Car(i,semaphore).start();

        }

    }

    static class Car extends Thread {

        private int num;

        private Semaphore semaphore;

        public Car(int num, Semaphore semaphore) {

            this.num = num;

            this.semaphore = semaphore;

        }

        @Override

        public void run() {

            try {

                semaphore.acquire();//获得一个令牌

                System.out.println("第" + num + "辆车抢到一个车位");

                TimeUnit.SECONDS.sleep(2);

                System.out.println("第" + num +"辆车走喽~");

            catch (InterruptedException e) {

                e.printStackTrace();

            finally {

                semaphore.release();

            }

        }

    }

}

猜想:

  • state初始为一正整数  
  • acquire()–>  每次调用-1   若state小于0,挂起
  • release()–>释放许可,若有被挂起的线程,则唤醒线程
     

源码分析

aquire()   state-1

public void acquire() throws InterruptedException {

    sync.acquireSharedInterruptibly(1);

}

public final void acquireSharedInterruptibly(int arg)

        throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    if (tryAcquireShared(arg) < 0)

        doAcquireSharedInterruptibly(arg);

}

static final class NonfairSync extends Sync {

    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {

        super(permits);

    }

    protected int tryAcquireShared(int acquires) {

        return nonfairTryAcquireShared(acquires);

    }

}

final int nonfairTryAcquireShared(int acquires) {

    for (;;) {

        int available = getState();

        int remaining = available - acquires;

        if (remaining < 0 ||

            compareAndSetState(available, remaining))

            return remaining;

    }

}

private void doAcquireSharedInterruptibly(int arg)

    throws InterruptedException {

    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;

    try {

        for (;;) {

            final Node p = node.predecessor();

            if (p == head) {

                int r = tryAcquireShared(arg);

                if (r >= 0) {

                    setHeadAndPropagate(node, r);

                    p.next = null// help GC

                    failed = false;

                    return;

                }

            }

            if (shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())

                throw new InterruptedException();

        }

    finally {

        if (failed)

            cancelAcquire(node);

    }

}

release  state +1

public void release() {

    sync.releaseShared(1);

}

public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        doReleaseShared();

        return true;

    }

    return false;

}

protected final boolean tryReleaseShared(int releases) {

    for (;;) {

        int current = getState();

        int next = current + releases;

        if (next < current) // overflow

            throw new Error("Maximum permit count exceeded");

        if (compareAndSetState(current, next))

            return true;

    }

}

private void doReleaseShared() {

    for (;;) {

        Node h = head;

        if (h != null && h != tail) {

            int ws = h.waitStatus;

            if (ws == Node.SIGNAL) {

                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

                    continue;            // loop to recheck cases

                unparkSuccessor(h);

            }

            else if (ws == 0 &&

                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

                continue;                // loop on failed CAS

        }

        if (h == head)                   // loop if head changed

            break;

    }

}

Semaphore为什么要用共享锁?

因为可以同时释放多个令牌 意味着可以同时有多个线程可以同时抢到锁

同时允许多个线程抢到锁  注意不是同时抢占到同一把锁

重新理解互斥锁  重入锁 共享锁  

3,CyclicBarrier -注意他不是使用AQS实现的

可重复的栅栏  

实现上相当于多个线程通过CountDownLatch .await()方法阻塞  然后另一线程通过countDown()方法来唤醒

与countDownLatch 的区别

CyclicBarrier不需要发令枪 (调用countDown方法)

public class CyclicBarrierDemo {

    public static void main(String[] args) {

        int n = 4;

        CyclicBarrier cyclicBarrier = new CyclicBarrier(n);

        for (int i = 0; i < n; i++) {

            new Writer(cyclicBarrier).start();

        }

    }

    static class Writer extends Thread {

        private CyclicBarrier cyclicBarrier;

        public Writer(CyclicBarrier cyclicBarrier) {

            this.cyclicBarrier = cyclicBarrier;

        }

        @Override

        public void run() {

            try {

                TimeUnit.SECONDS.sleep(1);

                System.out.println(Thread.currentThread().getName() + "线程写入数据完毕,等待其他线程继续处理");

                cyclicBarrier.await();   //state-1

            catch (InterruptedException e) {

                e.printStackTrace();

            catch (BrokenBarrierException e) {

                e.printStackTrace();

            }

            System.out.printf("所有线程都写入完成,继续处理其他任务"); //该代码句 可以放在回调里面

        }

    }

}

这对以上代码的优化点  所有的线程执行完毕可以回调

CyclicBarrier cyclicBarrier = new CyclicBarrier(n, ()->{

    System.out.printf("所有线程都写入完成,继续处理其他任务");

});

 源码分析:使用到了ReentrantLock  Condition

没有直接使用AQS

JUC 并发工具

 

public int await() throws InterruptedException, BrokenBarrierException {

    try {

        return dowait(false, 0L);

    catch (TimeoutException toe) {

        throw new Error(toe); // cannot happen

    }

}

private int dowait(boolean timed, long nanos)

    throws InterruptedException, BrokenBarrierException,

           TimeoutException {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        final Generation g = generation;

        if (g.broken)

            throw new BrokenBarrierException();

        if (Thread.interrupted()) {

            breakBarrier();

            throw new InterruptedException();

        }

        int index = --count;

        if (index == 0) {  // tripped

            boolean ranAction = false;

            try {

                final Runnable command = barrierCommand;

                if (command != null)

                    command.run();

                ranAction = true;

                nextGeneration();

                return 0;

            finally {

                if (!ranAction)

                    breakBarrier();

            }

        }

        // loop until tripped, broken, interrupted, or timed out

        for (;;) {

            try {

 

                if (!timed)

                    trip.await();

                else if (nanos > 0L)

                    nanos = trip.awaitNanos(nanos);

            catch (InterruptedException ie) {

                if (g == generation && ! g.broken) {

                    breakBarrier();

                    throw ie;

                else {

                    // We're about to finish waiting even if we had not

                    // been interrupted, so this interrupt is deemed to

                    // "belong" to subsequent execution.

                    Thread.currentThread().interrupt();

                }

            }

            if (g.broken)

                throw new BrokenBarrierException();

            if (g != generation)

                return index;

            if (timed && nanos <= 0L) {

                breakBarrier();

                throw new TimeoutException();

            }

        }

    finally {

        lock.unlock();

    }

}

4,Exchanger

上一篇:2021年全网最新Java多线程,锁,JMM,JUC和高并发设计模式震撼来袭!


下一篇:JUC之Java中的阻塞队列及其实现原理