我自己根据理解写了4种,可能会存在问题,望指正!
问题描述:
开5个线程,作为生产者线程,每个线程往容器里放3个;
开5个消费者线程,每个线程从容器里get3次;
如果容器满了,生产者阻塞;
如果容器空了,消费者阻塞。
1.用一个普通的数据结构如ArrayList来实现。
利用synchronized和Object自带的wait()/notifyAll()方法实现。
public static void main(String[] args) {
List<Integer> c=new ArrayList<Integer>(5);
//producer
for(int i=0;i<5;i++) {
new Thread(()->{
for(int j=0;j<3;j++) {
while(true) {
synchronized(c) {
if(c.size()<5) {
c.add(j);
System.out.println(Thread.currentThread().getName()+"product "+j);
c.notifyAll();
break;
}else {
try {
c.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
},String.valueOf(i)).start();
}
for(int i=0;i<5;i++) {
new Thread(()->{
for(int j=0;j<3;j++) {
while(true) {
synchronized(c){
if(c.size()>0) {
int res=c.get(c.size()-1);
c.remove(c.size()-1);
System.out.println(Thread.currentThread().getName()+"comsume "+res);
c.notifyAll();
break;
}else {
try {
c.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
},String.valueOf(-i)).start();
}
}
在写的过程中遇到了java.lang.IllegalMonitorStateException,也记录下,这个Exception是在调用object的wait和notify,notifyAll方法的时候可能会出现的异常。注意wait/notify的方法要放在同步代码块或同步方法里(synchronized里)。
2.实现一个并发阻塞的容器
如果每次都需要用外部程序来实现一个生产者消费者的case,有点低效,所以如果实现一个容器来完成这些工作会更好。
class Container<T>{
private int cap;
private List<T> list=null;
public Container(int n) {
cap=n;
list=new ArrayList<>();
}
public synchronized boolean put(T n) {
while(true) {
if(list.size()<cap) {
list.add(n);
System.out.println(Thread.currentThread().getName()+"product "+n);
notifyAll();
break;
}else {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return true;
}
public synchronized boolean take() {
while(true) {
if(list.size()>0) {
T res=list.get(list.size()-1);
list.remove(list.size()-1);
System.out.println(Thread.currentThread().getName()+"comsume "+res);
notifyAll();
break;
}else {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return true;
}
public int size() {
return list.size();
}
}
使用:
public static void main(String[] args) {
Container<Integer> c=new Container<>(5);
for(int i=0;i<5;i++) {
new Thread(()->{
for(int j=0;j<3;j++) {
c.put(j);
}
},String.valueOf(i)).start();
new Thread(()->{
for(int j=0;j<3;j++) {
c.take();
}
},String.valueOf(i)).start();
}
}
循环部分优化的写法:
public synchronized void put(T t) {
while(list.size()==column) {
try {
//notifyAll(); 满了才会叫醒消费者
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(t);
notifyAll(); //只要有生产就叫醒comsumer
System.out.println("product 第"+list.size()+"个food");
}
public synchronized T get() {
while(list.size()==0) {
try {
notifyAll();
this.wait();
} catch (InterruptedException e) {
System.out.println("容器空了");
}
}
notifyAll();//只要有消耗就叫醒producer
System.out.println("comsume 第"+list.size()+"个food");
return list.remove(list.size()-1);
}
3.利用Lock和Condition方法来实现
class Container<T>{
private int cap;
private List<T> list=null;
Lock lock=new ReentrantLock();
Condition producer=lock.newCondition();
Condition consumer=lock.newCondition();
public Container(int n) {
cap=n;
list=new ArrayList<>();
}
public boolean put(T n) {
lock.lock();
while(list.size()==cap) {
try {
producer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(n);
System.out.println(Thread.currentThread().getName()+"product "+n);
consumer.signalAll();
lock.unlock();
return true;
}
public boolean take() {
lock.lock();
while(list.size()==0) {
try {
consumer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T res=list.get(list.size()-1);
list.remove(list.size()-1);
System.out.println(Thread.currentThread().getName()+"comsume "+res);
producer.signalAll();
lock.unlock();
return true;
}
public int size() {
return list.size();
}
}
使用方法同方法2.
4.BlockingQueue实现
其实上面2,3相当于是BlockingQueue的功能(暂不考虑FIFO/LIFO的问题)。因此用BlockingQueue来实现生产者消费者。
public static void main(String[] args) {
BlockingQueue<Integer> c=new ArrayBlockingQueue<>(5);
for(int i=0;i<5;i++) {
new Thread(()->{
for(int j=0;j<3;j++) {
try {
c.put(j);
System.out.println(Thread.currentThread().getName()+"product "+j);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},String.valueOf(i)).start();
new Thread(()->{
for(int j=0;j<3;j++) {
try {
int x=c.take();
System.out.println(Thread.currentThread().getName()+"consume "+x);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},String.valueOf(i)).start();
}
}
总结
1.为什么wait外面用while不能用if?
因为wait()是会释放锁的,那么即使后来,被消费者叫醒,它也是没有锁的,需要重新去抢锁。既然有多个生产者,多个生产者都会去抢,假设其中一个抢到了,并且把容器又做满了,那么其他生产者再抢到时,如果是if,那么它不会再去判断了,而是继续执行wait()后面的代码,这样可能就超了。所以要用while,wait醒来抢到锁之后还要再检查一遍有没有被其他被叫醒的生产者填满。
这里即使用if double check也是一样不行。因为if(size==column)那还得重新wait,醒来又是和上面一样的问题了。
wait()在99%的情况下都是和while搭配!
2.为什么要用notifyAll,而不能用notify()
因为notify是随机叫醒一个线程,假如说现在容器里没有东西了,生产者生产了一个,叫醒了一个消费者,这个消费者消费了,然后又没了,去notify,但是这时叫醒的是另外一个消费者,这个消费者一看没有就会一直等,也不会再叫醒其他人了。就会陷入死循环。
尽量用notifyAll(),少用notify()(除非个别情况,如只有一个线程)
更好的写法就是用Condition的await和signal了,有针对的唤醒。
3.并发逻辑wait写在main里行不行?
我觉得是可以的,但是这样每个线程都要写一份这个代码,重复了。写在并发容器里,可以提高代码利用率。
Irisohohoh 发布了15 篇原创文章 · 获赞 0 · 访问量 2095 私信 关注