Java 并发编程Semaphore的应用与源码解析

What

Semaphore标识信号量,允许指定数量的线程同时访问某个资源

How

通过以下两部实现信号量:

  • acquire方法用于获得准入许可(如果没有获得许可,则进行等待,直到有线程释放许可而获得许可为止)
  • release用于释放准入许可

应用场景

  1. 实现某种资源池限制,类似于数据库连接池
  2. 对容器施加边界,比如一个集合中最多只能添加5个元素
  3. 资源并发访问数量限制
  4. 当作普通的使用(信号量为1时相当于普通的锁 信号量大于1时共享锁)

Semaphore代码示例

import java.util.concurrent.Semaphore;

public class SemaphoreDemo implements Runnable {

    Semaphore semaphore = new Semaphore(5);

    public static void main(String[] args) {
        SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(semaphoreDemo);
            thread.start();
        }
    }

    @Override
    public void run() {
        try {
            // 获得准入许可(如何没有获得成功,则进行等待,直到有线程释放许可而获得该许可为止)
            semaphore.acquire();
            Thread.sleep(1000);
            System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + ", 执行完毕!");
            // 释放准入许可
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

示例中,有10条线程对信号量为5的资源进行争用,但每次只有5个线程拿到许可,另外的线程需要等待拿到许可的线程释放许可后才能拿到许可

Semaphore源码解析

关键方法如下:

  1. 构造方法:new Semaphore(5);
  2. 获取许可:semaphore.acquire();
  3. 释放许可:semaphore.release();

接下来我们从这三个方法入手进行源码解析

1. 构造方法:new Semaphore(5)

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
 
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

默认构造方法为非公平共享锁,可以通过构造参数fair来选择公平或非公平,类似于ReentantLock

2. 获取许可:semaphore.acquire()

    public void acquire() throws InterruptedException {
        // 共享式获取AQS的同步状态
        sync.acquireSharedInterruptibly(1);
    }

调用的是AQS的acquireSharedInterruptibly方法:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        if (tryAcquireShared(arg) < 0) {
            doAcquireSharedInterruptibly(arg);
        }
    }

其中tryAcquireShared依赖于Sync实现,在Semaphore中有AQS的实现Sync类,方法如下:

        // 尝试获取共享锁
        protected int tryAcquireShared(int acquires) {
            for (; ; ) {
                // 队列中存在等待线程则返回-1
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState(); // 可用许可数量
                int remaining = available - acquires; // 剩余许可数量

                if (remaining < 0 || compareAndSetState(available, remaining))
                    // 返回可用的余量
                    return remaining;
            }
        }

这是FairSync的tryAcquireShared方法,在NonfairSync中,没有hasQueuedPredecessors()判断,其余一样。

在方法中可以看出,最终返回的是剩余的许可数量,有如下几种情况:

  1. 如果剩余许可数量<0,则执行doAcquireSharedInterruptibly方法让线程自旋等待,这里是等待别的线程释放许可后线程被唤醒去尝试获取

        private void doAcquireSharedInterruptibly(int arg)
                throws InterruptedException {
            // 线程进入同步队列
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (; ; ) { // 自旋
                    final Node p = node.predecessor();
                    if (p == head) { // 当前节点的前置节点是AQS的头节点 即自己是AQS同步队列的第一个节点
                        int r = tryAcquireShared(arg); // 再去获取信号量
                        if (r >= 0) {
                            setHeadAndPropagate(node, r); // 退出自旋
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    // 判断是否应该挂起该线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt()) {
                        throw new InterruptedException();
                    }
                }
            } finally {
                if (failed) {
                    cancelAcquire(node); // 获取失败 就取消获取
                }
            }
        }
    
  2. 否则就是拿到了许可数量,继续正常执行,不阻塞

3. 释放许可:semaphore.release()

    public void release() {
        sync.releaseShared(1);
    }

同样,调用的是AQS的releaseShared方法,看下代码:

    public final boolean releaseShared(int arg) {
        // 调用AQS实现类的tryReleaseShared
        if (tryReleaseShared(arg)) {
            // 唤醒后续的线程节点
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared交由子类Sync实现,代码如下:

        protected final boolean tryReleaseShared(int releases) {
            for (; ; ) {
                int current = getState(); // 当前信号量许可数
                int next = current + releases; // 当前信号量许可数+释放的信号量许可数
                if (next < current)  // overflow 一般不会走进来
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next)) // CAS更新当前信号量许可数
                    return true;
            }
        }

释放许可成功则继续调用AQS的doReleaseShared方法来唤醒后续节点可以来争取许可了

    private void doReleaseShared() {
        for (; ; ) { // 自旋等待
            Node h = head;
            // 有头节点且头节点和尾节点不是同一个
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 设置status为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                        continue; // 循环检查
                    }
                    // 唤醒节点的后续节点
                    unparkSuccessor(h);
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) { //
                    continue; // 失败则继续循环
                }
            }
            if (h == head) {
                break;
            }
        }
    }

总结

Semaphore使用AQS同步状态来保存信号量的计数器。
acquireSharedInterruptibly会减少计数(获取许可),当计数为非正值的时候阻塞线程,否则不会阻塞线程
releaseShared方法会增加计数(释放许可),在计数不超过信号量限制时会解除线程的阻塞(获取到许可的线程)

上一篇:源码分析:Semaphore之信号量


下一篇:Java中的并发工具类CountDownLatch CyclicBarrier Semaphore