Kotlin中有多种方法可以实现多线程生产/消费模型(大多也适用于Java)
- Synchronized
- Lock
- BlockingQueue
- Semaphore
- RxJava
- Coroutine
Synchronized
val buffer = LinkedList<Data>()
val MAX = 5 //buffer最大size
val lock = Object()
fun produce(data: Data) {
sleep(2000) // mock produce
synchronized(lock) {
while (buffer.size >= MAX) {
// 当buffer满时,停止生产
// 注意此处使用while不能使用if,因为有可能是被另一个生产线程而非消费线程唤醒,所以要再次检查buffer状态
// 如果生产消费两把锁,则不必担心此问题
lock.wait()
}
buffer.push(data)
// notify方法只唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现。
// notifyAll会唤醒所有等待中线程,哪一个线程将会第一个处理取决于操作系统的实现,但是都有机会处理。
// 此处使用notify有可能唤醒的是另一个生产线程从而造成死锁,所以必须使用notifyAll
lock.notifyAll()
}
}
fun consume() {
synchronized(lock) {
while (buffer.isEmpty())
lock.wait() // 暂停消费
buffer.removeFirst()
lock.notifyAll()
}
sleep(2000) // mock consume
}
// 同时启动多个生产、消费线程
repeat(10) {
Thread { produce(Data()) }.start()
}
repeat(10) {
Thread { consume() }.start()
}
Lock
Lock相对于Synchronized好处是,当有多个生产线/消费线程时,我们可以通过定义多个condition精确指定唤醒哪一个。下面的例子没有这么复杂,只是替换上面Synchronized写法
val buffer = LinkedList<Data>()
val MAX = 5 //buffer最大size
val lock = ReentrantLock()
val condition = lock.newCondition()
fun produce(data: Data) {
sleep(2000) // mock produce
lock.lock()
while (buffer.size >= 5)
condition.await()
buffer.push(data)
condition.signalAll()
lock.unlock()
}
fun consume() {
lock.lock()
while (buffer.isEmpty())
condition.await()
buffer.removeFirst()
condition.singleAll()
lock.unlock()
sleep(2000) // mock consume
}
BlockingQueue
BlockingQueue在达到临界条件时,再进行读写会自动阻塞当前线程等待锁的释放,天然适合这种生产/消费场景
val buffer = LinkedBlockingQueue<Data>(5)
fun produce(data: Data) {
sleep(2000) // mock produce
buffer.put(data) //buffer满时自动阻塞
}
fun consume() {
buffer.take() // buffer空时自动阻塞
sleep(2000) // mock consume
}
Semaphore
Semaphore是JUC提供的一种共享锁机制,可以进行拥塞控制,此特性可用来控制buffer的大小。
// canProduce: 可以生产数量(即buffer可用的数量),生产者调用acquire,减少permit数目
val canProduce = Semaphore(5)
// canConsumer:可以消费数量,生产者调用release,增加permit数目
val canConsume = Semaphore(5)
// 控制buffer访问互斥
val mutex = Semaphore(0)
val buffer = LinkedList<Data>()
fun produce(data: Data) {
if (canProduce.tryAcquire()) {
sleep(2000) // mock produce
mutex.acquire()
buffer.push(data)
mutex.release()
canConsume.release() //通知消费端新增加了一个产品
}
}
fun consume() {
if (canConsume.tryAcquire()) {
sleep(2000) // mock consume
mutex.acquire()
buffer.removeFirst()
mutex.release()
canProduce.release() //通知生产端可以再追加生产
}
}
RxJava
RxJava不太适合用于多生产者多消费者的场景,但是可以用在单生产者/但消费者场景。
Flowable的被压机制可以用来控制buffer数量
class Producer : Flowable<Data>() {
override fun subscribeActual(subscriber: org.reactivestreams.Subscriber<in Data>) {
subscriber.onSubscribe(object : Subscription {
override fun cancel() {
//...
}
private val outStandingRequests = AtomicLong(0)
override fun request(n: Long) { //收到下游通知,开始生产
outStandingRequests.addAndGet(n)
while (outStandingRequests.get() > 0) {
sleep(2000)
subscriber.onNext(Data())
outStandingRequests.decrementAndGet()
}
}
})
}
}
class Consumer : DefaultSubscriber<Data>() {
override fun onStart() {
request(1)
}
override fun onNext(i: Data?) {
sleep(2000) //mock consume
request(1) //通知上游可以增加生产
}
override fun one rror(throwable: Throwable) {
//...
}
override fun onComplete() {
//...
}
}
@Test
fun test_rxjava() {
try {
val testProducer = Producer)
val testConsumer = Consumer()
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer)
} catch (t: Throwable) {
t.printStackTrace()
}
}
Continue/produce
协程中的Channel具有拥塞控制机制,可以实现生产者消费者之间的通信。
官方基于Channel封装了produce方法,专门处理此类场景
val produce = produce(Dispatchers.IO, 5) {
repeat(10) {
sleep(2000) //mock produce
val data = Data()
send(data) // 超过5个以上没有消费完时,协程挂起
}
}
produce.consumeEach {
sleep(3000) //mock consume
}
后记
所线程的生产者消费者问题,无非就是处理两端的通信,锁、信号量、RxJava背压、协程Channel等都是可以选择的通信方式。