java线程间通信:一个小Demo完全搞懂

版权声明:本文出自汪磊的博客,转载请务必注明出处。

Java线程系列文章只是自己知识的总结梳理,都是最基础的玩意,已经掌握熟练的可以绕过。

一、从一个小Demo说起

上篇我们聊到了Java多线程的同步机制:Java多线程同步问题:一个小Demo完全搞懂。这篇我们聊一下java多线程之间的通信机制。

上一篇探讨java同步机制的时候我们举得例子输出log现象是:一段时间总是A线程输出而另一段时间总是B线程输出,有没有一种方式可以控制A,B线程交错输出呢?答案是当然可以了,这时候我们就要用到多线程的wait/notify机制了。

wait/notify机制就是当线程A执行到某一对象的wait()方法时,就会进入等待状态,此时线程A放弃持有的锁,其余线程可以竞争锁的持有权。当有其余线程调用notify()或者notifyAll()方法的时候就可能(当有多个线程的时候notify()方法只会唤醒处于等待状态线程中的一个)唤醒线程A,使其从wait状态醒来,继续向下执行业务逻辑。

接下来,我们通过一个小demo加以理解。

二、单生产者消费者模式

demo很简单,就是开启两个线程,一个生产面包,另一个负责消费面包,并且生产一个就要消费一个,交替执行。

首先看下BreadFactory类:

 public class BreadFactory {
//生产面包个数计数器
private int count = 0;
//线程的锁
private Object o = new Object();
private boolean flag = false; public void product() {
synchronized (o) {
if (flag) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"生产了第" + (++count) + "个面包");
flag = true;
o.notify();
}
} public void consume() {
synchronized (o) {
if (!flag) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"消费第" + count + "个面包");
flag = false;
o.notify();
}
}
}

此类就是负责生产,消费面包,flag主要用于控制线程之间的切换。

接下来我们看下Producter,Consumer类:

 public class Producter extends Thread {

     private BreadFactory mBreadFactory;

     public Producter(BreadFactory mBreadFactory) {
super();
this.mBreadFactory = mBreadFactory;
} @Override
public void run() {
//
while (true) {
mBreadFactory.product();
}
}
}

很简单,初始化的时候需要传递进来一个BreadFactory实例对象,线程启动的时候调用BreadFactory类中product()方法不停生产面包。

Consumer类同理:

 public class Consumer extends Thread {

     private BreadFactory mBreadFactory;

     public Consumer(BreadFactory mBreadFactory) {
super();
this.mBreadFactory = mBreadFactory;
} @Override
public void run() {
//
while (true) {
mBreadFactory.consume();
}
}
}

最后看下main方法:

 public static void main(String[] args) {
//
BreadFactory factory = new BreadFactory();
Producter p1 = new Producter(factory);
p1.start();
Consumer c1 = new Consumer(factory);
c1.start();
}

没什么要多说的,就是初始化并启动线程,运行程序,输出如下:

Thread-0生产了第1个面包
Thread-1消费第1个面包
Thread-0生产了第2个面包
Thread-1消费第2个面包
Thread-0生产了第3个面包
Thread-1消费第3个面包
Thread-0生产了第4个面包
Thread-1消费第4个面包
。。。。。

三、多生产者消费者模式

似乎很顺利的就实现了啊,但是实际需求中怎么可能只有一个生产者,一个消费者,生产者,消费者是有多个的,我们试下多个生产者,消费者是什么现象,修改main中逻辑:

 public static void main(String[] args) {
//
BreadFactory factory = new BreadFactory();
Producter p1 = new Producter(factory);
p1.start();
Consumer c1 = new Consumer(factory);
c1.start();
Producter p2 = new Producter(factory);
p2.start();
Consumer c2 = new Consumer(factory);
c2.start();
}

我们就是只多添加了一个生产者和一个消费者,其余没任何变化。

运行程序,输出信息如下:

。。。
Thread-2生产了第4个面包
Thread-1消费第4个面包
Thread-2生产了第5个面包
Thread-1消费第5个面包
Thread-2生产了第6个面包
Thread-1消费第6个面包
Thread-3消费第6个面包
Thread-0生产了第7个面包
Thread-3消费第7个面包
。。。

咦?生产到第6个面包,竟然被消费了两次,这显然是不正常的,那是哪里出问题了呢?

四、多生产者消费者模式问题产生原因分析

接下来,我们直接分析问题产生的原因,我们分析下BreadFactory中product()与consume()方法:

 public void product() {
synchronized (o) {
if (flag) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"生产了第" + (++count) + "个面包");
flag = true;
o.notify();
}
} public void consume() {
synchronized (o) {
if (!flag) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"消费第" + count + "个面包");
flag = false;
o.notify();
}
}

从线程启动顺序以及打印信息可以看出线程0,线程2负责生产面包,线程1,线程3负责消费面包。

线程执行过程中,线程1消费掉第5个面包,此时flag置为false,执行notify()方法唤醒其余线程争取锁获取执行权。

此时线程3获取线程执行权,执行consume()业务逻辑flag此时为false,进入if(!flag)逻辑,执行wait()方法,此时线程3进入wait状态,停留在25行代码处。释放锁资源,其余线程可以争取执行权。

此时线程1获取执行权,和线程3一样,最终停留在25行代码处。释放锁资源,其余线程可以争取执行权。注意:此时线程1,线程3都停留在25行代码处,处于wait状态。

接下来线程2获取执行权,执行生产业务,生产了第6个面包,然后释放锁资源,其余线程可以争取执行权。

然后线程1又获取执行权,上面说了线程1停留在25行代码处,现在获取执行权从25行代码处开始执行,消费掉第6个面包没问题,flag置为false。然后释放锁资源,其余线程可以争取执行权。

此时线程3又获取执行权,上面分析时说了线程3处于25行代码处wait状态,现在获取执行权从25行代码处开始执行,又消费了第6个面包,到这里面包6被消耗了两次。

经过上面分析已经知道产生问题的原因了,线程获取执行权后直接从wait处开始继续执行,不在检查if条件是否成立,这里就是问题产生的原因了。

那怎么修改的呢?很简单了,将if判断改为while条件判断就可以了,这样线程获取执行权后还会再次检查while条件判断是否成立。

运行程序打印Log如下:

 。。。
Thread-1消费第19个面包
Thread-0生产了第20个面包
Thread-1消费第20个面包
Thread-2生产了第21个面包

看输出Log上面问题是解决了,生产一个面包只会消费一次,但是发现程序运行自己终止了,上面生产到第21个面包程序似乎不运行了没Log输出了,这是什么原因呢?

五、notify()通知丢失问题以及notify()与notifyAll()的区别

要想明白上述问题产生的原因我们就必须搞懂notify()与notifyAll()的区别。简单说就是notify()只会唤醒同一监视器处于wait状态的一个线程(随机唤醒),

而notifyAll()会唤醒同一监视器处于wait状态的所有线程。

我们分析上面问题产生的原因:线程0,线程2负责生产面包,线程1,线程3负责消费面包,在程序运行过程存在如下情况:

线程1,3处于consume()中的wait()处,线程0处于product()中wait()处,此时线程2生产完第21个面包执行notify()方法,通知处于同一监视器下处于wait状态线程,此时处于wait状态线程为线程1,线程3与线程0,按理说我们是想唤醒一个线程1,3中一个线程来消费刚刚生产的面包,但是程序可不知道啊,调用notify方法随机唤醒一个线程,碰巧此时唤醒的还是生产线程0,这就是notify通知丢失问题,线程0执while判断又处于wait状态了,到这里就出现了控制台没有Log输出现象了,经过上面分析我们该明白问题出现的原因就是notify通知丢失问题,通知了一个我们不想通知的线程,那怎么解决呢?很简单了,程序中notify()方法改为notifyAll()就可以了,改为notifyAll()方法上述线程2通知的时候会一起唤醒线程0,1,3,也就是唤醒同一监视器处于wait状态的所有线程,到这里运行程序就没有什么问题了。

六、notify()与notifyAll()性能问题

也许有些同学有疑问了,既然notify()方法会产生问题,那我就用notifyAll()不就完了,直接屏蔽掉notify()方法。这样做当然是很Low的做法。

假设有N个线程在wait状态下,调用notifyall会唤醒所有线程,然后这N个线程竞争同一个锁,最后只有一个线程能够得到锁,其它线程又回到wait状态。这意味每一次唤醒操作可能带来大量的竞争锁的请求。这对于频繁的唤醒操作而言性能上可能是一种灾难。如果说总是只有一个线程被唤醒后能够拿到锁,这种情况下使用notify的性能是要高于notifyall的。

七、JDK1.5中Condition通知机制

JDK1.5中Condition通知机制这里就不详细讲解了,Condition中await(),signal(),signalAll()相当于传统线程通信机制中wait(),notify(),notifyAll()方法。

我们修改BreadFactory类如下,其余类均不变:

 public class BreadFactory {
// 生产面包个数计数器
private int count = 0;
// 线程的锁
private Lock lock = new ReentrantLock();
private Condition consumeCon = lock.newCondition();
private Condition productCon = lock.newCondition();
private boolean flag = false; public void product() {
lock.lock();
try {
while (flag) {
try {
productCon.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "生产了第"
+ (++count) + "个面包");
flag = true;
consumeCon.signal();
} finally {
//
lock.unlock();
}
} public void consume() {
lock.lock();
try {
while (!flag) {
try {
consumeCon.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "消费第" + count
+ "个面包");
flag = false;
productCon.signal();
} finally {
//
lock.unlock();
}
}
}

其强大之处就在于代码中6,7,15,28,40,53行代码处,我们并没有调用signalAll()方法,而是调用的signal()方法。

这样我们就可以控制在生产完一个面包去唤醒消费的线程来消费面包,而不用连同生产线程一起唤醒,这就是其强大之处,这里就不详细分析了,不太熟悉的同学可自行搜索其余博客学习一下,比较简单,但是很基础很重要的。

关于线程间通信问题本篇到此就结束了,再说一次,多线程相关博客没什么新玩意,只是自己工作以来一次总结,虽然基础,枯燥,但是比较重要,希望本篇博客对您有用。

声明:文章将会陆续搬迁到个人公众号,以后文章也会第一时间发布到个人公众号,及时获取文章内容请关注公众号

java线程间通信:一个小Demo完全搞懂

上一篇:转:彻底搞清楚javascript中的require、import和export


下一篇:freeswitch刷新网关方法汇总