生产者消费者
import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class TaskExecutor { private List<Integer> list = new ArrayList<>(); private AtomicInteger count = new AtomicInteger(); private ReentrantLock lock = new ReentrantLock(); /** * 积压消息容量 */ private int capacity; private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); /** * 可读索引 */ private int read; public TaskExecutor(int capacity) { this.capacity = capacity; } public Integer product() { Integer value = null; lock.lock(); try { while (list.size() >= capacity) { c1.await(); } value = count.incrementAndGet(); list.add(value); read++; c2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } return value; } public Integer consume() { Integer value = null; lock.lock(); try { while (list.size() <= 0) { c2.await(); } read--; value = list.remove(read); c1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } return value; } private Integer getSize() { lock.lock(); try { return list.size(); } finally { lock.unlock(); } } public static void main(String[] args) { TaskExecutor taskExecutor = new TaskExecutor(20); new Thread(()-> { for (;;) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 生产: " + taskExecutor.product() + " 积压: " + taskExecutor.getSize()); } }, "producer").start(); new Thread(()-> { for (;;) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 消费: " + taskExecutor.consume() + " 积压: " + taskExecutor.getSize()); } }, "consumer").start(); } }