Java Thread系列(四)线程通信

Java Thread系列(四)线程通信

一、传统通信

public static void main(String[] args) {
//volatile实现两个线程间数据可见性
private volatile static List list = new ArrayList(); Thread t1 = new Thread(new Runnable() { // (1)
public void run() {
try {
for(int i = 0; i <10; i++){
list.add(i);
System.out.println(Thread.currentThread().getName() + "线程添加第" + (i + 1) + "个元素..");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1"); Thread t2 = new Thread(new Runnable() { // (2)
public void run() {
while(true){
if(list.size() == 5){
//do something
throw new RuntimeException(Thread.currentThread().getName() +
"线程接到通知 size = " + list.size() + " 线程停止..");
}
}
}
}, "t2"); t1.start();
t2.start();
}
  1. t1 线程不断将生产的数据放入 list 集合中

  2. t2 线程开启 while 循环监听 t1 线程,虽然可以实现 list.size()==5 时实时通知 t2 线程,但太浪费性能,考虑用 await/notify 提高性能,程序执行结果如下:

t1线程添加第1个元素..
t1线程添加第2个元素..
t1线程添加第3个元素..
t1线程添加第4个元素..
t1线程添加第5个元素..
Exception in thread "t2" java.lang.RuntimeException: t2线程接到通知 size = 5 线程停止..
at com.github.binarylei.thread._2_1conn.ListAdvice1$2.run(ListAdvice1.java:35)
at java.lang.Thread.run(Thread.java:745)
t1线程添加第6个元素..
t1线程添加第7个元素..
t1线程添加第8个元素..
t1线程添加第9个元素..
t1线程添加第10个元素..

二、wait/notify 实现通信

/**
* 使用wait/notify方法实现线程单挑通信(注意这两个方法是Object类的方法)
* 1. wait和notity必须配合synchronized关键字使用
* 2. wait方法(关闭线程)释放锁,notify(唤醒线程)方法不释放锁
* 缺点:通知不实时,使用CountDownLatch实现实时通知
*/
public static void main(String[] args) {
private volatile static List list = new ArrayList();
final Object lock = new Object(); Thread t1 = new Thread(new Runnable() { // (1)
public void run() {
try {
synchronized (lock) {
System.out.println("t1启动..");
for(int i = 0; i <10; i++){
list.add(i);
System.out.println(Thread.currentThread().getName() + "线程添加第" + (i + 1) + "个元素..");
Thread.sleep(500);
if(list.size() == 5){
System.out.println("已经发出通知..");
lock.notify();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1"); Thread t2 = new Thread(new Runnable() { // (2)
public void run() {
synchronized (lock) {
System.out.println("t2启动..");
if(list.size() != 5){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//do something
throw new RuntimeException(Thread.currentThread().getName() +
"线程接到通知 size = " + list.size() + " 线程停止..");
}
}
}, "t2");
}
  1. t1 线程当 list.size()==5lock.notify() 唤醒 t2 线程,注意 wait/notify 必须配合 synchronized 使用

  2. t2 线程调用 lock.wait() 后处于一直阻塞状态,直到 t1 线程调用 lock.notify() 唤醒该线程,倘若没有线程唤醒 t2 线程,那么 t2 线程就一直处于阻塞状态。本例中若 t1 线程先启动,那么 t2 线程调用 lock.wait() 就永远阻塞无法执行。程序执行结果如下:。

t2启动..
t1启动..
t1线程添加第1个元素..
t1线程添加第2个元素..
t1线程添加第3个元素..
t1线程添加第4个元素..
t1线程添加第5个元素..
已经发出通知..
t1线程添加第6个元素..
t1线程添加第7个元素..
t1线程添加第8个元素..
t1线程添加第9个元素..
t1线程添加第10个元素..
Exception in thread "t2" java.lang.RuntimeException: t2线程接到通知 size = 10 线程停止..
at com.github.binarylei.thread._2_1conn.ListAdd2$2.run(ListAdd2.java:51)
at java.lang.Thread.run(Thread.java:745)
  1. 由于 t1 线程 lock.notify() 后不会释放锁,t2 线程虽然被唤醒但不能获取锁,所以通知就不那么实时,只有等 t1 线程执行完成释放锁后 t2 线程才能获得锁执行相应操作,解决方案:使用 CountDownLatch

三、CountDownLatch 实现实时通信


public static void main(String[] args) {
private volatile static List list = new ArrayList();
final CountDownLatch countDownLatch = new CountDownLatch(1); // (1) Thread t1 = new Thread(new Runnable() {
public void run() {
try {
System.out.println("t1启动..");
for(int i = 0; i <10; i++){
list.add(i);
System.out.println(Thread.currentThread().getName() + "线程添加第" + (i + 1) + "个元素..");
Thread.sleep(500);
if(list.size() == 5){
System.out.println("已经发出通知..");
countDownLatch.countDown(); // (2)
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} }
}, "t1"); Thread t2 = new Thread(new Runnable() {
public void run() {
System.out.println("t2启动..");
if(list.size() != 5){
try {
countDownLatch.await(); // (3)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//do something
throw new RuntimeException(Thread.currentThread().getName() +
"线程接到通知 size = " + list.size() + " 线程停止..");
}
}, "t2"); t1.start();
t2.start();
}
  1. CountDownLatch 同步工具类,允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行,参数 1 表示需要等待的线程数量,具体来说就是参数为几就必须调用几次 countDownLatch.countDown()

  2. countDownLatch.countDown() 唤醒线程

  3. countDownLatch.await() 阻塞线程,程序执行结果如下:

t1启动..
t1线程添加第1个元素..
t2启动..
t1线程添加第2个元素..
t1线程添加第3个元素..
t1线程添加第4个元素..
t1线程添加第5个元素..
已经发出通知..
Exception in thread "t2" java.lang.RuntimeException: t2线程接到通知 size = 5 线程停止..
t1线程添加第6个元素..
at com.github.binarylei.thread._2_1conn.ListAdd3$2.run(ListAdd3.java:47)
at java.lang.Thread.run(Thread.java:745)
t1线程添加第7个元素..
t1线程添加第8个元素..
t1线程添加第9个元素..
t1线程添加第10个元素..

四、ThreadLocal

ThreadLocal 是线程局部变量,是一种多线程间并发访问变量的无锁解决方案。

ThreadLocal 和 synchronized 比较?

  1. 与 synchronized 等加锁的方式不同,ThreadLocal 完全不提供锁,而使用以空间换时间的手段,为每个线程提供变量的独立副本,以保障线程安全。

  2. 从性能上说,ThreadLocal 不具有绝对的优势,在并发不是很高的时候,加锁的性能会更好,但作为一套无锁的解决方案,在高并发量或者竞争激烈的场景,使用 ThreadLocal 可以在一定程度上减少锁竞争。

public static void main(String[] args) throws InterruptedException {
final ThreadLocal<String> th = new ThreadLocal<String>(); Thread t1 = new Thread(new Runnable() {
public void run() {
th.set("张三");
System.out.println(th.get()); // => "张三"
}
}, "t1"); Thread t2 = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
th.set("李四");
System.out.println(th.get()); // => "李四"
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t2"); t1.start(); //t1:张三
t2.start(); //t2:李四
}

五、自定义同步类窗口-Queue

Java 提供了一些同步类容器,它们是 线程安全 的,如 Vector、HashTable 等。这些同步类容器是由 Collections.synchronizedMap 等工厂方法去创建实现的,底层使用 synchronized 关键字,每次只有一个线程访问容器。下面实现一个自己的同步类窗口。

import java.util.LinkedList;

public class MyQueue {
private LinkedList list = new LinkedList();
private int max = 5;
private int min = 1;
private Object lock = new Object(); public void put(Object obj) { // (1)
synchronized (lock) {
while (list.size() == max) {
try {
lock.wait();
} catch (InterruptedException e) {
;
}
}
list.add(obj);
lock.notify();
System.out.println("put元素:" + obj);
}
} public Object take() { // (2)
Object obj;
synchronized (lock) {
while (list.size() == min) {
try {
lock.wait();
} catch (InterruptedException e) {
;
}
}
obj = list.removeFirst();
lock.notify();
System.out.println("take元素:" + obj);
}
return obj;
}
}

测试

public static void main(String[] args) {
final MyQueue myQueue = new MyQueue();
myQueue.put("a");
myQueue.put("b");
myQueue.put("c");
myQueue.put("d");
myQueue.put("e"); new Thread(new Runnable() {
@Override
public void run() {
myQueue.put("f");
myQueue.put("g");
myQueue.put("h");
myQueue.put("i");
}
}).start(); new Thread(new Runnable() {
@Override
public void run() {
myQueue.take();
myQueue.take();
myQueue.take();
}
}).start();
}

每天用心记录一点点。内容也许不重要,但习惯很重要!

上一篇:[C# 基础知识系列]专题四:事件揭秘


下一篇:[.NET领域驱动设计实战系列]专题十一:.NET 领域驱动设计实战系列总结