官方解释:
- 一个计数信号量。在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。每个release()添加许可证,潜在地释放阻塞获取方。但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源
我记得考科目一的时候有一个大教室,这个教室只能同时允许两百人考试,当有一个考完之后,下一个才能进去进行考试。门口会有安检人员进行安检,这个Semaphore就相当于这个安检员。
也可以理解为停车场,停车场内的停车位是固定的,只有当一辆或多辆车开走之后外面等待的车才能进去停车。
用法:
1、定义三个资格 Semaphore semaphore = new Semaphore(3); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(100)); for (int i = 0; i < 10; i++) { int finalI = i; poolExecutor.execute(new Thread() { @Override public void run() { try { //获取执行资格 semaphore.acquire(1); System.out.println(finalI+"========="); //模拟每个线程运行的时间 Thread.sleep(1000); //释放执行资格 semaphore.release(1); } catch (InterruptedException e) { e.printStackTrace(); } } }); } poolExecutor.shutdown();
运行结果如下:(同一时刻只能运行三个线程。有点模糊,凑合看)
解析:
一、定义:
public Semaphore(int permits) { sync = new NonfairSync(permits);}
Semaphroe底层也是用Sync类,默认是非公平的,也有公平的构造方法。
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
定义的资格数其实是设置锁的状态值的(AQS之前已说过,维护锁状态值和线程等待队列)
abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } }
二、为什么能限制同时执行的线程数量呢?
这就是acquire方法的用处了
public void acquire(int permits) { sync.acquireSharedInterruptibly(permits);}
点进acquireSharedInterruptibly这个方法看看:
public final void acquireSharedInterruptibly(int arg) { 1、尝试获取锁,返回值小于0就是获取锁失败 if (tryAcquireShared(arg) < 0) 2、如果获取失败,则进入队列进行等待,之前已经解析过 doAcquireSharedInterruptibly(arg); }
可以看到,跟之前CountDownLatch的await方法是一样的。
tryAcquireShared方法最终执行的如下方法:
final int nonfairTryAcquireShared(int acquires) { for (;;) { 1、获取当前锁状态,锁状态值一开始是自定义的 int available = getState(); 2、当前申请后剩余的锁状态值 int remaining = available - acquires; if (3、如小于0,则申请失败,进入等待队列中 remaining < 0 || 4、CAS替换锁状态值 compareAndSetState(available, remaining)) return remaining; } }
上述是非公平的,公平的只加了一个判断线程等待队列前是否有其它线程。排队一个一个来。
static final class FairSync extends Sync { protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } }
这个就是为什么Semaphore能控制当前并发线程的数量的原因。
三、释放锁
线程获取执行资格之后需要释放锁。这就是release方法的用处。不释放的话锁会一直被占用,其他线程就无法运行。
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
点进releaseShared看看
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
跟之前的CountDownLatch是一样的,只是实现不一样。Semaphore实现如下:
protected final boolean tryReleaseShared(int releases) { for (;;) { 1、获取锁当前状态 int current = getState(); 2、释放锁,直接相加 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); 3、用CAS更新锁状态 if (compareAndSetState(current, next)) return true; } }
=======================================================
我是Liusy,一个喜欢健身的程序员。
欢迎关注微信公众号【Liusy01】,一起交流Java技术及健身,获取更多干货,最新更新【K8S】。