Semaphore:限流器的底层模型你知道吗?

Semaphore:限流器的底层模型你知道吗?

信号量模型

信号量模型可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()
Semaphore:限流器的底层模型你知道吗?

  • init():设置计数器的初始值。
  • down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当
    前线程可以继续执行
  • up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个
    线程,并将其从等待队列中移除

这里提到的 init()、down() 和 up() 三个方法都是原子性的,并且这个原子性是由信号量模
型的实现方保证的
。在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore
实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

class Semaphore{
   // 计数器
   int count;
   // 等待队列
   Queue queue;
   // 初始化操作
   Semaphore(int c){
     this.count=c;
   }
   void down(){
     this.count--;
     if(this.count<0){
     // 将当前线程插入等待队列
     // 阻塞当前线程
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
    // 移除等待队列中的某个线程 T
    // 唤醒线程 T
    }
  }
}

信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V操作,所以信号量模型也被称为 PV 原语。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和release()。

Semaphore的使用

其实你想想红绿灯就可以了。十字路口的红绿灯可以控制交通,得益于它的一个关键规则:车辆在通过路口前必须先检查是否是绿灯,只有绿灯才能通行。

这里我们还是用累加器的例子来说明信号量的使用吧。在累加器的例子里面,count+=1 操作是个临界区,只允许一个线程执行,也就是说要保证互斥。

就像我们用互斥锁一样,只需要在进入临界区之前执行一下 down() 操作,退出临界区之前执行一下 up() 操作就可以了。下面是 Java 代码的示例,acquire() 就是信号量里的 down() 操作,release() 就是信号量里的 up() 操作。

static int count;
// 初始化信号量
static final Semaphore s = new Semaphore(1);
// 用信号量保证互斥
static void addOne() {
 s.acquire();
 try {
   count+=1;
 } finally {
   s.release();
 }
}

假设两个线程 T1 和 T2 同时访问addOne() 方法,当它们同时调用 acquire() 的时候,由于 acquire() 是一个原子操作,所以只能有一个线程(假设 T1)把信号量里的计数器减为 0,另外一个线程(T2)则是将计数器减为 -1。对于线程 T1,信号量里面的计数器的值是 0,大于等于 0,所以线程 T1 会继续执行;对于线程 T2,信号量里面的计数器的值是 -1,小于 0,按照信号量模型里对down() 操作的描述,线程 T2 将被阻塞。所以此时只有线程 T1 会进入临界区执行count+=1;
当线程 T1 执行 release() 操作,也就是 up() 操作的时候,信号量里计数器的值是 -1,加
1 之后的值是 0,小于等于 0,按照信号量模型里对 up() 操作的描述,此时等待队列中的
T2 将会被唤醒。于是 T2 在 T1 执行完临界区代码之后才获得了进入临界区执行的机会,从
而保证了互斥性。

限流器的实现

估计你会觉得奇怪,既然有Java SDK 里面提供了 Lock,为啥还要提供一个 Semaphore ?其实实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区

比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。其中,你可能最熟悉数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的

下面以一个对象池为例进行讲解:

class ObjPool<T, R> {
   final List<T> pool;
   // 用信号量实现限流器
   final Semaphore sem;
   // 构造函数
   ObjPool(int size, T t){
    pool = new Vector<T>(){};
    for(int i=0; i<size; i++){
    pool.add(t);
   }
   sem = new Semaphore(size);
  }
  // 利用对象池的对象,调用 func
  R exec(Function<T,R> func) {
  T t = null;
  sem.acquire();
  try {
    t = pool.remove(0);
    return func.apply(t);
  } finally {
  pool.add(t);
  sem.release();
  }
 }
}
// 创建对象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
  System.out.println(t);
  return t.toString();
});

我们用一个 List来保存对象实例,用 Semaphore 实现限流器。关键的代码是 ObjPool 里
面的 exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用
acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是
10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执
行,相当于通过了信号灯,而其他线程则会阻塞在 acquire() 方法上。对于通过信号灯的线
程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),
分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调
函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用
release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说
明有线程在等待,此时会自动唤醒等待的线程。

总结:
觉得有用的客官可以点赞、关注下!感谢支持

上一篇:MyEclipse安装JS代码提示(Spket插件)


下一篇:在mac上安装nodejs