--用wait notifyAll来实现生产者与消费者模式,如下
package com.collonn.procon2; import java.util.LinkedList; import java.util.concurrent.atomic.AtomicInteger; public class PCTest { // the max number of product in product pool public static final int QUEUE_MAX_SIZE = 3; // product pool public static final LinkedList<Integer> QUEUE = new LinkedList<Integer>(); // product name public static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(); // the speed of producer public static final int PRODUCE_SPEED = 1000 * 1; // the speed of consumer public static final int CONSUME_SPEED = 100 * 1; // the number of producer public static final int PRODUCE_COUNT = 1; // the number of consumer public static final int CONSUME_COUNT = 5; // create producer private void produce() { for (int i = 0; i < PRODUCE_COUNT; i++) { new Thread(new Producer()).start(); } } // create consumer private void consume() { for (int i = 0; i < CONSUME_COUNT; i++) { Thread thread = new Thread(new Consumer()); thread.setName("td" + i); thread.start(); } } public static void main(String[] args) throws Exception { PCTest pct1 = new PCTest(); System.out.println("start producer..."); pct1.produce(); Thread.sleep(1000 * 5); System.out.println("start consumer..."); pct1.consume(); } } // producer class Producer implements Runnable { @Override public void run() { while (true) { try { synchronized (PCTest.QUEUE) { if(PCTest.QUEUE.size() == PCTest.QUEUE_MAX_SIZE){ System.out.println("product pool full..."); PCTest.QUEUE.wait(); }else{ int next = PCTest.ATOMIC_INTEGER.getAndAdd(1); PCTest.QUEUE.addLast(next); System.out.println("produce," + next); PCTest.QUEUE.notifyAll(); } } Thread.sleep(PCTest.PRODUCE_SPEED); } catch (InterruptedException e) { e.printStackTrace(); } } } } // consumer class Consumer implements Runnable { @Override public void run() { try { while (true) { synchronized (PCTest.QUEUE) { if(PCTest.QUEUE.size() == 0){ System.out.println("product pool empty..."); PCTest.QUEUE.wait(); }else{ Integer data = PCTest.QUEUE.removeFirst(); System.out.println("consume," + Thread.currentThread().getName() + "," + data); PCTest.QUEUE.notifyAll(); } } Thread.sleep(PCTest.CONSUME_SPEED); } } catch (InterruptedException e) { e.printStackTrace(); } } }
--用BlockingDeque来实现生产者与消费者模式,如下
package com.collonn.procon; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; public class ProConMain { // product pool public static final BlockingDeque<Integer> BLOCK_QUEUE = new LinkedBlockingDeque<Integer>(5); // product name public static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(); // the speed of producer public static final int PRODUCE_SPEED = 1000 * 3; // the speed of consumer public static final int CONSUME_SPEED = 1000 * 1; // the number of producer public static final int PRODUCE_COUNT = 1; // the number of consumer public static final int CONSUME_COUNT = 20; // create producer private void produce() { for (int i = 0; i < PRODUCE_COUNT; i++) { new Thread(new Producer()).start(); } } // create consumer private void consume() { for (int i = 0; i < CONSUME_COUNT; i++) { Thread thread = new Thread(new Consumer()); thread.setName("td" + i); thread.start(); } } public static void main(String[] args) { ProConMain t1 = new ProConMain(); t1.produce(); t1.consume(); } } // producer class Producer implements Runnable { @Override public void run() { while (true) { try { int next = ProConMain.ATOMIC_INTEGER.getAndAdd(1); ProConMain.BLOCK_QUEUE.putLast(next); System.out.println("produce," + next); Thread.sleep(ProConMain.PRODUCE_SPEED); } catch (InterruptedException e) { e.printStackTrace(); } } } } // consumer class Consumer implements Runnable { @Override public void run() { try { while (true) { Integer data = ProConMain.BLOCK_QUEUE.takeFirst(); System.out.println("consume," + Thread.currentThread().getName() + "," + data); Thread.sleep(ProConMain.CONSUME_SPEED); } } catch (InterruptedException e) { e.printStackTrace(); } } }
一:用Executor来实现生产者与消费者模式,如下
package com.collonn.executor; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ProConExecutor { // product name public static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(); // the speed of producer public static final int PRODUCE_SPEED = 1000 * 1; // the speed of consumer public static final int CONSUME_SPEED = 1000 * 5; // the speed of consumer public static final int BLOCK_QUEUE_SIZE = 5; // create thread pool with deal strategy // corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, rejectedExecutionHandler public static final Executor EXECUTOR = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(BLOCK_QUEUE_SIZE), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { System.out.println("[overload], ignore data:" + ((Taskk)r).getData() + ", poolSize:" + e.getPoolSize() + ", queueSize:" + e.getQueue().size()); } }); public static void main(String[] args) { // produce product, then, throw to thead pool, then deal the data new Thread() { @Override public void run() { try { while (true) { int next = ProConExecutor.ATOMIC_INTEGER.getAndAdd(1); ProConExecutor.EXECUTOR.execute(new Taskk(next)); System.out.println("produce," + next); Thread.sleep(PRODUCE_SPEED); } } catch (Exception e) { e.printStackTrace(); } } }.start(); } } // process data class Taskk implements Runnable { private Integer data; public Taskk(Integer data) { this.data = data; } public Integer getData() { return data; } public void setData(Integer data) { this.data = data; } @Override public void run() { try { System.out.println("consume," + this.data); Thread.sleep(ProConExecutor.CONSUME_SPEED); } catch (InterruptedException e) { e.printStackTrace(); } } }
总结:我们可以看到,代码在一步步的精简,且更优雅。