java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri

文章目录


前言

这篇文章讨论读写锁stampedLock。文章根据《Java并发编程的艺术》这本书以及黑马的视频 黑马多线程 做的笔记。


1. stampedLock

1. 概述

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用 加解读锁

long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁升级成真正的读锁,保证数据安全。

long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
 // 锁升级
}



2. 代码

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

class DataContainerStamped {
	//数据
    private int data;
    //StampedLock 锁
    private final StampedLock lock = new StampedLock();

    public DataContainerStamped(int data) {
        this.data = data;
    }
	//读取操作
    public int read(int readTime) {
        //首先获取stamp
        long stamp = lock.tryOptimisticRead();
        log.debug("optimistic read locking...{}", stamp);
        sleep(readTime);
        //验证如果是有效的,证明这期间没有写操作,直接返回即可,这时还是乐观锁
        if (lock.validate(stamp)) {
            //就可以读到数据
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        }
        // 否则证明已经有写锁修改过了,这里需要再次获取读锁,升级为真正的读锁
        // 锁升级 - 读锁
        log.debug("updating to read lock... {}", stamp);
        try {
            //获取stamp
            stamp = lock.readLock();
            log.debug("read lock {}", stamp);
            sleep(readTime);
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        } finally {
            log.debug("read unlock {}", stamp);
            lock.unlockRead(stamp);
        }
    }

    public void write(int newData) {
        //获取戳
        long stamp = lock.writeLock();
        log.debug("write lock {}", stamp);
        try {
            sleep(2);
            this.data = newData;
        } finally {
            log.debug("write unlock {}", stamp);
            lock.unlockWrite(stamp);
        }
    }
}



1. 读读

public class TestStampedLock {
    public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            dataContainer.read(0);
        }, "t2").start();
    }
}

输出结果:下面结果中可以看到两个线程同时获取读锁并执行读操作,没有先后的关系。

19:09:07.857 [t1] DEBUG c.DataContainerStamped - optimistic read locking...256
19:09:08.361 [t2] DEBUG c.DataContainerStamped - optimistic read locking...256
19:09:08.362 [t2] DEBUG c.DataContainerStamped - read finish...256, data:1
19:09:08.873 [t1] DEBUG c.DataContainerStamped - read finish...256, data:1



2. 读写

public class TestStampedLock {
    public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            dataContainer.write(0);
        }, "t2").start();
    }
}

结果输出:一开始是读操作先睡眠一秒,在睡眠之前已经获取了戳了,在 t1 线程睡眠期间 t2 线程获取到了写锁,并将数据修改,而且戳也改成了384.此时 t1 线程醒过来校验发现戳已经被修改了,所以这时候 t1 线程会等待 t2 线程释放写锁之后去获取读锁。完成从乐观读 -> 读锁 的升级。

19:10:49.987 [t1] DEBUG c.DataContainerStamped - optimistic read locking...256
19:10:50.485 [t2] DEBUG c.DataContainerStamped - write lock 384
19:10:50.998 [t1] DEBUG c.DataContainerStamped - updating to read lock... 256
19:10:52.498 [t2] DEBUG c.DataContainerStamped - write unlock 384
19:10:52.498 [t1] DEBUG c.DataContainerStamped - read lock 513
19:10:53.508 [t1] DEBUG c.DataContainerStamped - read finish...513, data:0
19:10:53.508 [t1] DEBUG c.DataContainerStamped - read unlock 513

3. 注意

  • StampedLock 不支持条件变量(await、signal 这些没法用)
  • StampedLock 不支持可重入



2. Semaphore

1. 基本使用

信号量,用来限制能同时访问共享资源的线程上限。它通过协调各个线程,以保证合理的使用公共资源。

@Slf4j
public class TestSemaphore {
    public static void main(String[] args) {
        // 1. 创建 semaphore 对象
        //这里设置上限为3,表示线程只支持三个,达到了3个线程之后这个变量就为0了
        //第二个参数是表示公平非公平:其他线程来了如果是公平是不可能竞争的
        //如果是非公平是可以和等待队列里面的线程竞争的
        Semaphore semaphore = new Semaphore(3);

        // 2. 10个线程同时运行
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    //3. 获取许可,acquire后semaphore-1变成2
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    log.debug("running...");
                    sleep(1);
                    log.debug("end...");
                } finally {
                    semaphore.release();
                }
            }).start();
        }
    }
}

结果输出:很明显的是,首先 0,1,2三个线程先执行,然后其他线程暂停,然后线程0,1,2执行完成之后其他线程接着竞争三个名额。
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri


2. 应用场景

Semaphore 可以用于流量控制,特别是公共资源有限的应用场景,比如数据库连接。加入有一个需求,需要读取几万个文件的数据,这属于 IO 密集型任务,我们可以启动几十个线程去读取文件,但是在读取到本地之后要写入数据库时我们就得控制连接数,否则连接数过多会报错无法获取连接。这时候就可以使用 Semaphore 来控制并发数。

  • 使用 Semaphore 在高峰时让请求线程阻塞,等到高峰过去了再释放许可,当然它只适合限制单机线程数量(没有考虑分布式),并且仅仅是限制线程数而不是资源数。该处理多少资源还是得处理多少资源
  • 使用 Semaphore 简单实现连接池(一个线程对应一个连接),对比享元模式下的实现(wait,notify),性能和可读性更好。
public class TestPoolSemaphore {
    public static void main(String[] args) {
        Pool pool = new Pool(2);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                Connection conn = pool.borrow();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                pool.free(conn);
            }).start();
        }
    }
}

class Pool {
    // 1. 连接池大小
    private final int poolSize;
    // 2. 连接对象数组
    private Connection[] connections;
    // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
    private AtomicIntegerArray states;
    private Semaphore semaphore;

    // 4. 构造方法初始化
    public Pool(int poolSize) {
        this.poolSize = poolSize;
        // 让许可数与资源数一致,保证一个线程一个资源
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new MockConnection("连接" + (i+1));
        }
    }

    // 5. 借连接
    public Connection borrow() {// t1, t2, t3
        // 获取许可
        try {
            semaphore.acquire(); // 没有许可的线程,在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            // 获取空闲连接
            if(states.get(i) == 0) {
                if (states.compareAndSet(i, 0, 1)) {
                    log.debug("borrow {}", connections[i]);
                    return connections[i];
                }
            }
        }
        // 不会执行到这里,肯定能找到空闲连接的,线程数和连接数一样
        return null;
    }
    // 6. 归还连接
    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                states.set(i, 0);
                log.debug("free {}", conn);
                //归还许可
                semaphore.release();
                break;
            }
        }
    }
}

class MockConnection implements Connection{}

输出结果:可以看到,这里我们设置了大小为2之后,每次获取都是两个两个获取的。其他线程就在等着,下面就以这个为例,讲讲原理
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri


3. 原理

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一

1、刚开始,假设permits(state)为 3,这时 5 个线程来获取资源,下面时构造方法的调用链
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri
其实到这里,本质上就是赋值给了 AQS 中的 state
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri



2、假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,这时候就进入 AQS 队列park 阻塞
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri



3、这时 Thread-4 释放了 permits,状态如下
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri
接下来 Thread-0 竞争成功,permits 从1再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri



加锁


public void acquire() throws InterruptedException {
   sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
      throws InterruptedException {
     if (Thread.interrupted())
         throw new InterruptedException();
     //tryAcquireShared返回值时剩余的资源数,当满了之后就返回负数
     if (tryAcquireShared(arg) < 0)
         doAcquireSharedInterruptibly(arg);
 }

//tryAcquireShared里面调用了一个这个方法
 final int nonfairTryAcquireShared(int acquires) {
     for (;;) {
          //获取状态,还剩多少名额
          int available = getState();
          //减去许可数 3-1 = 2,如果这时候已经为0了,那么就返回负数
          int remaining = available - acquires;
          //设置剩余的许可数
          if (remaining < 0 ||
              compareAndSetState(available, remaining))
              //返回剩余的数
              return remaining;
      }
  }

//doAcquireSharedInterruptibly已经说过了
private void doAcquireSharedInterruptibly(int arg)
  throws InterruptedException {
     //创建 Share 节点
     final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
         for (;;) {
             //找到前驱节点
             final Node p = node.predecessor();
             //判断是不是头
             if (p == head) {
                 //此时当前线程时老二,尝试再次获取
                 int r = tryAcquireShared(arg);
                 if (r >= 0) {
                     //获取成功了就设置头节点为空,并且唤醒后面所有的共享节点
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     failed = false;
                     return;
                 }
             }
             //然后在这里park住
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 throw new InterruptedException();
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
 }

解锁

public void release() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

//释放
 protected final boolean tryReleaseShared(int releases) {
     
     for (;;) {
        //拿到状态,就是剩余的资源数,此时是0
         int current = getState();
         //释放了一个当然要加上1了
         int next = current + releases;
         //如果next < 当前的,证明加法溢出了
         if (next < current) // overflow
             throw new Error("Maximum permit count exceeded");
          //CAS设置
         if (compareAndSetState(current, next))
             return true;
     }
 }

  private void doReleaseShared() {
      //for循环
      for (;;) {
         //获取头节点,如果是tail,证明此时没有其他等待节点了
         Node h = head;
         if (h != null && h != tail) {
             //获取状态
             int ws = h.waitStatus;
             //如果是-1,证明有义务唤醒下一个节点
             if (ws == Node.SIGNAL) {
                 //把状态从 -1 改成 0
                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                     continue;   
                  //唤醒下一个节点
                 unparkSuccessor(h);
             }
             else if (ws == 0 &&
                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                 continue;                // loop on failed CAS
         }
         if (h == head)                   // loop if head changed
             break;
     }
 }



3. CountdownLatch

1. 介绍

用来进行线程同步协作,等待所有线程完成倒计时

其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数 -1

下面是内部的锁的实现

 private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
		//设置状态,就是计数值
        Sync(int count) {
            setState(count);
        }
        
        int getCount() {
            return getState();
        }
        //这个方法就是看看线程有没有都执行完了,如果是0就表示线程都执行完了
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
		//释放锁
        protected boolean tryReleaseShared(int releases) {
            //一个线程用完就让计数值-1
            for (;;) {
                //获取状态
                int c = getState();
                if (c == 0)
                    return false;
                //-1
                int nextc = c-1;
                //CAS修改
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }



2. 使用

@Slf4j(topic = "c.TestCountDownLatch")
public class TestCountDownLatch {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CountDownLatch latch = new CountDownLatch(3);
        new Thread(()->{
            log.debug("线程 t1 开始");
            sleep(1);
            latch.countDown();
            log.debug("线程 t1 结束");
        }, "t1").start();
        new Thread(()->{
            log.debug("线程 t2 开始");
            sleep(2);
            latch.countDown();
            log.debug("线程 t2 结束");
        }, "t2").start();
        new Thread(()->{
            log.debug("线程 t3 开始");
            sleep(5);
            latch.countDown();
            log.debug("线程 t3 结束");
        }, "t3").start();

        log.debug("主线程等待其他线程执行完成再往下执行");
        latch.await();
        log.debug("主线程执行完成");
    }

输出结果:可以看到下面主线程是等待其他线程执行完成了才继续往下执行的,至于下面的 t3 结束打印在最后是因为 t3 结束先唤醒了主线程
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri

其实后来的线程的使用基本都是使用线程池的,所以线程一般不会轻易结束,这时候用 join 那些 api 就不行了。

我们来看下面的线程池用法:我们使用三个线程来执行任务,然后一个线程等待任务结束返回结果

@Slf4j(topic = "c.TestCountDownLatch")
public class TestCountDownLatch {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CountDownLatch latch = new CountDownLatch(3);
        ExecutorService service = Executors.newFixedThreadPool(4);
        service.submit(()->{
            log.debug("线程开始");
            sleep(1);
            latch.countDown();
            log.debug("线程结束");
        });
        service.submit(()->{
            log.debug("线程开始");
            sleep(1.5);
            latch.countDown();
            log.debug("线程结束");
        });
        service.submit(()->{
            log.debug("线程开始");
            sleep(2);
            latch.countDown();
            log.debug("线程结束");
        });
        service.submit(()->{
            try {
                log.debug("线程等待结果");
                latch.await();
                log.debug("线程获取到结果");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

输出结果:
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri



3. 应用之等待多线程加载完毕

我们定义下面这个方法,然后实现10个用户加载的情况,等到加载完成打印一句游戏开始

 private static void wangzherongyao() throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(10);
        CountDownLatch latch = new CountDownLatch(10);
        Random r = new Random();
        String[] all = new String[10];
        //10个玩家
        for (int j = 0; j < 10; j++) {
            //final是因为lambda表达式接收的是常量
            final int k = j;
            service.submit(()->{
                for (int i = 0; i <= 100; i++) {
                    try {
                        Thread.sleep(r.nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    all[k] = i + "%";
                    //使用 \r 可以回退到最开始的输出位置然后对原来的输出进行覆盖
                    System.out.print("\r" + Arrays.toString(all));
                }
                //一个任务运行结束了就 -1
                latch.countDown();
            });
        }
        //主线程等待任务执行完成
        latch.await();
        System.out.println("\n游戏开始");
    }

最终输出结果:
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri



4. 应用之等待多个远程调用结束

下面是使用restTemplate来发送请求,在工作中的应用可以是当要不断一个接口请求多此的时候,此时可以用到多线程配合CountDown 来进行计数,当读取完成接口之后就可以对这些获取到的数据进行调用

private static void waitForObject() throws Exception {
        RestTemplate restTemplate = new RestTemplate();
        log.debug("等待远程服务调用返回结果");
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(4);
        service.submit(() -> {
            Map<String, Object> response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
            latch.countDown();
        });
        service.submit(() -> {
            Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
            latch.countDown();
        });
        service.submit(() -> {
            Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
            latch.countDown();
        });
        service.submit(() -> {
            Map<String, Object> response3 = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
            latch.countDown();
        });

        latch.await();
        log.debug("执行完毕");
        service.shutdown();
    }

如果想要返回结果,那么还是使用 Future 更合使:
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri



4. CyclicBarrier

1. 概念

循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。

为什么要用 CyclicBarrier,这个类作用和 CountdownLatch 类似,但是不同点就在于 CyclicBarrier是支持重用的,使用里面的 reset 方法进行重置。所以 CyclicBarrier可以处理更加复杂的业务,比如在计数错误的时候可以进行重置。此外 CyclicBarrier提供了 getNumberWaiting 方法可以获取阻塞的线程个数。isBroken() 方法可以用来了解阻塞的线程是否被中断



2. 基本使

private static void test2() {
        //创建大小为3的线程池对象
        ExecutorService service = Executors.newFixedThreadPool(2);
        //我们设置计数为2,第二个参数是任务,实际是其他两个任务执行完成之后会执行这个任务
        CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
            log.debug("任务1任务2结束");
        });
        for (int i = 0; i < 3; i++) { // task1  task2  task1
            service.submit(() -> {
                log.debug("任务1开始运行");
                sleep(1);
                try {
                    //调用 await 方法进行等待
                    barrier.await(); // 2-1=1
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
            service.submit(() -> {
                log.debug("任务2开始运行");
                sleep(2);
                try {
                    barrier.await(); // 1-1=0
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        service.shutdown();
    }

输出结果:
java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri

注意线程池的数量和计数值要一样,上面这个例子,如果设置线程为3,那么一次执行的顺序是 第1个任务1、第1个任务2 和 第2个任务1,注意任务二这时睡眠了2s,也就是说三个线程执行的时间刚刚好是2s,和线程数是2的时候的结果是一样的,此时执行完成后计数器重置。这样就不对了,这样就是第一个任务2 和 第二个 任务1 让计数器变为 0.



3. 业务场景

使用《Java并发编程的艺术》这本书里面给出的例子。这个计数器可以用于多线程计算数据,最终合并计算结果的场景。例如,用一个 Excel 保存了所有的银行流水,每个 Sheet 保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完成之后,得到每个 sheet 的日军银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均 银行流水,代码如下:

public class BankWaterService implements Runnable{
    /**
     * 创建4个屏障,处理完之后执行当前类的 run 方法
     */
    private CyclicBarrier c = new CyclicBarrier(4, this);

    /**
     * 假设只有四个 sheet,那么启动四个线程
     */
    private Executor executor = Executors.newFixedThreadPool(4);

    /**
     * 保存每个sheet处理出来的结果
     */
    private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

    private void count(){
        for (int i = 0; i < 4; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    //计算当前sheet的影流数据,代码略,下面直接模拟结果
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    try {
                        c.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    @Override
    public void run() {
        int result = 0;
        //汇总每个sheet计算出的结果
        for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result += sheet.getValue();
        }
        //将结果输出
        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();   //4
    }
}





如有错误,欢迎指出!!!

上一篇:翁恺老师慕课零基础学JAVA语言【Day5】


下一篇:【java】day5