生产者消费者模式--阻塞队列--LOCK,Condition--线程池

1.阻塞队列:http://www.cnblogs.com/dolphin0520/p/3932906.html

2.Condition 生产者消费者实现 :http://www.cnblogs.com/dolphin0520/p/3920385.html

3. Condition 优点 :http://blog.csdn.net/ghsau/article/details/7481142

3.Lock :http://www.cnblogs.com/dolphin0520/p/3923167.html

4.线程池,实现消费端http://www.cnblogs.com/dolphin0520/p/3932921.html

ArrayBlockingQueue 源码简写

class BoundedBuffer {
final Lock lock = new ReentrantLock();//锁对象
final Condition notFull = lock.newCondition();//写线程条件
final Condition notEmpty = lock.newCondition();//读线程条件 final Object[] items = new Object[100];//缓存队列
int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/; public void put(Object x) throws InterruptedException {
lock.lockInterruptibly(); //可中断锁
try {
while (count == items.length)//如果队列满了
notFull.await();//阻塞写线程
items[putptr] = x;//赋值
if (++putptr == items.length) putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0
++count;//个数++
notEmpty.signal();//唤醒读线程
} finally {
lock.unlock();
}
} public Object take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == 0)//如果队列为空
notEmpty.await();//阻塞读线程
Object x = items[takeptr];//取值
if (++takeptr == items.length) takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0
--count;//个数--
notFull.signal();//唤醒写线程
return x;
} finally {
lock.unlock();
}
}
}

LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue。

1. ArrayBlockingQueue
      基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
  ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行(即在读写操作上都需要锁住整个容器,),这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。(要注意对内存需求会更大)而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。(公平保证了锁是非常健壮的锁,有很大的性能成本。要确保公平所需要的记帐(bookkeeping)和同步,就意味着被 争夺的公平锁要比不公平锁的吞吐率更低)

2. LinkedBlockingQueue
      基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

ArrayBlockingQueue 中 读写锁 和condition 详解

lock 和condition 关系。 lock 是用来提供锁机制的 。而condition 是用来提供线程间的协作即挂起wait和唤醒notify。

列如ArrayBlockingQueue  中只有一把锁, 所以 PUT 和take方法 不能并行执行,同一时刻只有一个线程能获取锁。

如果未使用 condition,直接使用 对象锁wait 和notifyAll 方法, 是不能对被挂起的线程和 需要被唤醒的线程分组的。

设计一个场景解释:

1 .假设ArrayBlockingQueue  中只能放一个对象。初始时,队列为空。

2.同时1个线程(A)调用put,一个调用take(B).但是 tak优先拿到锁。所以另外一个线程被挂起,进入等待LCOK对象锁的队列

3.但是take方法调用时,发现队列为空,所以去执行 notEmpty.await() , 这时,该线程释放LCOK对象锁 ,并进入 Lock条件对象锁的notEmpty 队列。这里可以理解为

notEmpty 队列是LCOK对象锁的队列的子集,用于给正在等待LCOK对象锁的线程分组。

3 .接下来有来个个线程(C)去写入,加上前面被阻塞的线程(A),就同时有两个写入线程在等待锁.

4其中一个(A)没有获取到锁,被阻塞,而另一个(C)获取到LCOK对象锁,然后插入消息后,释放掉LCOK对象锁,并调用notEmpty.signal()。注意,这时候有2个线程在等待 Lock条件对象锁,一个读线程(B),一个写线程(A)。因为 notEmpty.signal() 唤醒的是Lock条件对象锁的notEmpty 队列 中的线程,即B,所以 唤醒的是读线程,而A线程没有被唤起。

这就是condition 的优势,如果采用 以前 对象锁,没有分条件的话,统一notifyall会同时唤起A和B ,让2个线程竞争,而明显,此时之应该唤醒读线程,因为队列已经满了,写线程被唤醒,也只会马上被再次挂起。效率低下。

LinkedBlockingQueue , 采用读写分离锁。 这样也可以避免读写两端同时竞争一把锁。

线程池原理:

执行完任务的线程去任务缓存队列里面取任务来执行

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行(这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。);若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
上一篇:[经验交流] 试用基于 influxdb+kapacitor 的监控系统


下一篇:JavaScript小细节点罗列(2)