Semaphore最详细解析

官方解释:

  • 一个计数信号量。在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。每个release()添加许可证,潜在地释放阻塞获取方。但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源

我记得考科目一的时候有一个大教室,这个教室只能同时允许两百人考试,当有一个考完之后,下一个才能进去进行考试。门口会有安检人员进行安检,这个Semaphore就相当于这个安检员。

也可以理解为停车场,停车场内的停车位是固定的,只有当一辆或多辆车开走之后外面等待的车才能进去停车。

用法:

1、定义三个资格 
Semaphore semaphore = new Semaphore(3);

ThreadPoolExecutor poolExecutor = 
        new ThreadPoolExecutor(10, 20, 
                5000, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(100));

for (int i = 0; i < 10; i++) {
    int finalI = i;
    poolExecutor.execute(new Thread() {
        @Override
        public void run() {
            try {
                //获取执行资格
                semaphore.acquire(1);
                System.out.println(finalI+"=========");
                //模拟每个线程运行的时间
                Thread.sleep(1000);
                //释放执行资格
                semaphore.release(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}


poolExecutor.shutdown();

  

运行结果如下:(同一时刻只能运行三个线程。有点模糊,凑合看)

Semaphore最详细解析

 

解析:

一、定义:

public Semaphore(int permits) {    sync = new NonfairSync(permits);}

  

Semaphroe底层也是用Sync类,默认是非公平的,也有公平的构造方法。

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

  

定义的资格数其实是设置锁的状态值的(AQS之前已说过,维护锁状态值和线程等待队列)

abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
        setState(permits);
    }
}

  

二、为什么能限制同时执行的线程数量呢?

这就是acquire方法的用处了

public void acquire(int permits) {    sync.acquireSharedInterruptibly(permits);}

  

点进acquireSharedInterruptibly这个方法看看:

public final void acquireSharedInterruptibly(int arg)
{
    1、尝试获取锁,返回值小于0就是获取锁失败
    if (tryAcquireShared(arg) < 0)
        2、如果获取失败,则进入队列进行等待,之前已经解析过
        doAcquireSharedInterruptibly(arg);
}

  

可以看到,跟之前CountDownLatch的await方法是一样的。

tryAcquireShared方法最终执行的如下方法:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        1、获取当前锁状态,锁状态值一开始是自定义的 
        int available = getState();
        2、当前申请后剩余的锁状态值
        int remaining = available - acquires;
        if (3、如小于0,则申请失败,进入等待队列中
            remaining < 0 ||
            4、CAS替换锁状态值
            compareAndSetState(available, remaining))
            return remaining;
    }
}

  

上述是非公平的,公平的只加了一个判断线程等待队列前是否有其它线程。排队一个一个来。

static final class FairSync extends Sync {

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

  

这个就是为什么Semaphore能控制当前并发线程的数量的原因。

三、释放锁

线程获取执行资格之后需要释放锁。这就是release方法的用处。不释放的话锁会一直被占用,其他线程就无法运行。

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

  

点进releaseShared看看

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

  

跟之前的CountDownLatch是一样的,只是实现不一样。Semaphore实现如下:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        1、获取锁当前状态
        int current = getState();
        2、释放锁,直接相加
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        3、用CAS更新锁状态
        if (compareAndSetState(current, next))
            return true;
    }
}

  

=======================================================

我是Liusy,一个喜欢健身的程序员。

欢迎关注微信公众号【Liusy01】,一起交流Java技术及健身,获取更多干货,最新更新【K8S】。

上一篇:【干货分享】深入理解高可用之限流


下一篇:JDK源码系列(3):CyclicBarrier