wait,会使调用的线程进入等待状态,会释放所持有的对象锁(调用的时候也必须先获取到锁,否则会抛出异常 IllegalMonitorStateException)
notifyAll、notify,会去唤醒应当前对象而等待的线程,(调用的时候也必须先获取到锁,否则会抛出异常 IllegalMonitorStateException)
顺便也记录一下join方法,调用join方法,会使当前线程进入等待,如果没有设置等待时间,就会等待另一个线程执行完成才返回(ps:调用join方法并不一定立刻执行另一个线程,只是当前线程进入等待,然后切换下一个线程)
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 /** 4 * @author lhd 5 */ 6 public class BlockQueue { 7 8 /** 9 * 生产者锁对象 10 */ 11 private final Object addLock = new Object(); 12 13 /** 14 * 消费者锁对象 15 */ 16 private final Object deleteLock = new Object(); 17 18 /** 19 * 队列总大小 20 */ 21 private final Integer size = 30; 22 23 /** 24 * 数据存放 25 */ 26 private Object[] queue = new Object[size]; 27 28 /** 29 * 存放的数量,使用AtomicInteger是因为普通的int递增递减操作会存在非原子性的问题,会使数量异常 30 */ 31 private AtomicInteger count = new AtomicInteger(0); 32 33 /** 34 * 生产 35 * @param o 对象 36 */ 37 public void add(Object o) { 38 //获取生产锁,wait方法必须获取到对象锁后才可以调用,否则抛出异常 39 synchronized (addLock){ 40 41 //判断是否超过队列大小,超过则进入等待 42 while (count.get() >= size){ 43 try { 44 addLock.wait(); 45 } catch (InterruptedException e) { 46 e.printStackTrace(); 47 } 48 } 49 //存放一个 50 queue[count.get()] = o; 51 //递增 52 int i = count.incrementAndGet(); 53 //打印一下日志 54 String name = Thread.currentThread().getName(); 55 System.out.println(name + "生产了一个,现有数量" + i); 56 } 57 //如果队列有数据,则调用notifyAll唤醒消费者 58 if (count.get() >= 1){ 59 //notifyAll、notify都需要先获取对象锁,否则会抛出异常 60 synchronized (deleteLock){ 61 deleteLock.notifyAll(); 62 } 63 } 64 65 } 66 67 /** 68 * 消费 69 * @return 70 */ 71 public Object poll(){ 72 Object o; 73 //先获取对象锁,和生产者类似 74 synchronized (deleteLock){ 75 //队列里没有数据则等待 76 while (count.get() <= 0){ 77 try { 78 deleteLock.wait(); 79 } catch (InterruptedException e) { 80 e.printStackTrace(); 81 } 82 } 83 //获取数据 84 o = queue[count.get()]; 85 //递减 86 int i = count.decrementAndGet(); 87 String name = Thread.currentThread().getName(); 88 System.out.println(name + "消费了一个,现有数量" + i); 89 } 90 //如果队列没有满,则可以唤醒生产者 91 if (count.get() < size){ 92 //需要先获取到锁 93 synchronized (addLock){ 94 addLock.notifyAll(); 95 } 96 } 97 return o; 98 } 99 100 /** 101 * 简单的测试 102 * @param args 103 */ 104 public static void main(String[] args) { 105 BlockQueue blockQueue = new BlockQueue(); 106 Thread t1 = new Thread(()-> {while (true){blockQueue.add(new Object());}}); 107 Thread t2 = new Thread(()-> {while (true){blockQueue.add(new Object());}}); 108 Thread t3 = new Thread(()-> {while (true){blockQueue.add(new Object());}}); 109 Thread t4 = new Thread(()-> {while (true){blockQueue.poll();}}); 110 Thread t5 = new Thread(()-> {while (true){blockQueue.poll();}}); 111 Thread t6 = new Thread(()-> {while (true){blockQueue.poll();}}); 112 t1.start(); 113 t2.start(); 114 t3.start(); 115 t4.start(); 116 t5.start(); 117 t6.start(); 118 } 119 120 }
效果:其实这个递增递减操作和打印操作也不是原子操作,
依次打印线程1,2,3
1 /** 2 * @author lhd 3 */ 4 public class JoinTest { 5 6 7 public static void main(String[] args) throws InterruptedException { 8 Thread t1 = new Thread(() -> System.out.println(1)); 9 Thread t2 = new Thread(()-> System.out.println(2)); 10 Thread t3 = new Thread(()-> System.out.println(3)); 11 12 t1.start(); 13 t1.join(); 14 15 t2.start(); 16 t2.join(); 17 18 t3.start(); 19 t3.join(); 20 21 } 22 23 }