Java中的多线程技术全面详解

本文主要从整体上介绍Java中的多线程技术,对于一些重要的基础概念会进行相对详细的介绍,若有叙述不清晰或是不正确的地方,希望大家指出,谢谢大家:)

为什么使用多线程

并发与并行

我们知道,在单核机器上,“多进程”并不是真正的多个进程在同时执行,而是通过CPU时间分片,操作系统快速在进程间切换而模拟出来的多进程。我们通常把这种情况成为并发,也就是多个进程的运行行为是“一并发生”的,但不是同时执行的,因为CPU核数的限制(PC和通用寄存器只有一套,严格来说在同一时刻只能存在一个进程的上下文)。
现在,我们使用的计算机基本上都搭载了多核CPU,这时,我们能真正的实现多个进程并行执行,这种情况叫做并行,因为多个进程是真正“一并执行”的(具体多少个进程可以并行执行取决于CPU核数)。综合以上,我们知道,并发是一个比并行更加宽泛的概念。也就是说,在单核情况下,并发只是并发;而在多核的情况下,并发就变为了并行。下文中我们将统一用并发来指代这一概念。

阻塞与非阻塞

UNIX系统内核提供了一个名为read的函数,用来读取文件的内容:

1
2
typedef ssize_t int;typedef size_t unsigned;ssize_t read(int fd,
    void *buf, size_t n);

这个函数从描述符为fd的当前文件位置复制至多n个字节到内存缓冲区buf。若执行成功则返回读取到的字节数;若失败则返回-1。read系统调用默认会阻塞,也就是说系统会一直等待这个函数执行完毕直到它产生一个返回值。然而我们知道,磁盘通常是一种慢速I/O设备,这意味着我们用read函数读取磁盘文件内容时,往往需要比较长的时间(相对于访问内存来说)。那么阻塞的时候我们当然不想让系统傻等着,我们想在这期间做点儿别的事情,等着磁盘准备好了通知我们一下,我们再来读取文件内容。实际上,操作系统正是这样做的。当阻塞在read这类系统调用中的时候,操作系统通常都会让该进程暂时休眠,调度一个别的进程来执行,以免干等着浪费时间,等到磁盘准备好了可以让我们来进行I/O了,它会发送一个中断信号通知操作系统,这时候操作系统重新调度原来的进程来继续执行read函数。这就是通过多进程实现的并发。

多进程 vs 多线程

进程就是一个执行中的程序实例,而线程可以看作一个进程的最小执行单元。线程与进程间的一个显著区别在于每个进程都有一整套变量,而同一个进程间的多个线程共享该进程的数据。也就是说在通常情况下,多线程在数据共享上要比多进程更加便捷。
然而,有时候,多线程共享数据的便捷容易可能会成为一个让我们头疼的问题,我们在后文中会具体提到常见的问题及相应的解决方案。在上面的read函数的例子中,如果我们使用多线程,可以使用一个主线程去进行I/O的工作,再用一个或几个工作线程去执行一些轻量计算任务,这样当主线程阻塞时,线程调度程序会调度我们的工作线程来执行计算任务,从而更加充分的利用CPU时间片。而且,在多核机器上,我们的多个线程可以并行执行在多个核上,进一步提升效率。

如何使用多线程

线程执行模型

每个进程刚被创建时都只含有一个线程,这个线程通常被称作主线程(main thread)。而后随着进程的执行,若遇到创建新线程的代码,就会创建出新线程,而后随着新线程被启动,多个线程就会并发地运行。某时刻,主线程阻塞在一个慢速系统调用中(比如前面提到的read函数),这时线程调度程序会让主线程暂时休眠, 调度另一个线程来作为当前运行的线程。

创建一个新线程

通过实现Runnable接口

在Java中,有两种方法可以创建一个新线程。第一种方法是定义一个实现Runnable接口的类并实例化,然后将这个对象传入Thread的构造器来创建一个新线程,如以下代码所示:

1
2
3
4
5
6
7
8
class MyRunnable implements Runnable {
  ...
  public void run() {
    //这里是新线程需要执行的任务
  }
}
Runnable r = new MyRunnable();
Thread t = new Thread(r);
通过继承Thread类

第二种创建一个新线程的方法是直接定义一个Thread的子类并实例化,从而创建一个新线程。比如以下代码:

1
2
3
4
5
class MyThread extends Thread {
  public void run() {
    //这里是线程要执行的任务
  }
}

创建了一个线程对象后,我们直接对其调用start方法即可启动这个线程:

1
t.start();
两种方式的比较

既然有两种方式可以创建线程,那么我们该使用哪一种呢?首先,直接继承Thread类的方法看起来更加方便,但它存在一个局限性:由于Java中不允许多继承,我们自定义的类继承了Thread后便不能再继承其他类,这在有些场景下会很不方便;实现Runnable接口的那个方法虽然稍微繁琐些,但是它的优点在于自定义的类可以继承其他的类。

线程的属性

线程的状态

线程在它的生命周期中可能处于以下几种状态之一:

  • New(新生):线程对象刚刚被创建出来;
  • Runnable(可运行):在线程对象上调用start方法后,相应线程便会进入Runnable状态,若被线程调度程序调度,这个线程便会成为当前运行(Running)的线程;
  • Blocked(被阻塞):若一段代码被线程A”上锁“,此时线程B尝试执行这段代码,线程B就会进入Blocked状态;
  • Waiting(等待):当线程等待另一个线程通知线程调度器一个条件时,它本身就会进入Waiting状态;
  • Time Waiting(计时等待):计时等待与等待的区别是,线程只等待一定的时间,若超时则不再等待;
  • Terminated(被终止):线程的run方法执行完毕或者由于一个未捕获的异常导致run方法意外终止会进入Terminated状态。

后文中若不加特殊说明的话,我们会用阻塞状态统一指代Blocked、Waiting、Time Waiting。

线程的优先级

在Java中,每个线程都有一个优先级,默认情况下,线程会继承它的父线程的优先级。可以用setPriority方法来改变线程的优先级。Java中定义了三个描述线程优先级的常量:MAX_PRIORITY、NORM_PRIORITY、MIN_PRIORITY。
每当线程调度器要调度一个新的线程时,它会首先选择优先级较高的线程。然而线程优先级是高度依赖于操作系统的,在有些系统的Java虚拟机中,甚至会忽略线程的优先级。因此我们不应该将程序逻辑的正确性依赖于优先级。线程优先级相关的API如下:

1
2
void setPriority(int newPriority) //设置线程的优先级,可以使用系统提供的三个优先级常量
static void yield() //使当前线程处于让步状态,这样当存在其他优先级大于等于本线程的线程时,线程调度程序会调用那个线程

Thread类

Thread实现了Runnable接口,关于这个类的以下实例域需要我们了解:

1
2
3
4
private volatile char name[]; //当前线程的名字,可在构造器中指定
private int priority; //当前线程优先级
private Runnable target; //当前要执行的任务
private long tid; //当前线程的ID

Thread类的常用方法除了我们之前提到的用于启动线程的start外还有:

  • sleep方法: 这是一个静态方法,作用是让当前线程进入休眠状态(但线程不会释放已获取的锁),这个休眠状态其实就是我们上面提到过的Time Waiting状态,从休眠状态“苏醒”后,线程会进入到Runnable状态。sleep方法有两个重载版本,声明分别如下:
1
2
3
4
//让当前线程休眠millis指定的毫秒数
public static native void sleep(long millis) throws InterruptedException;
 //在毫秒数的基础上还指定了纳秒数,控制粒度更加精细
public static native void sleep(long millis, int nanos) throws InterruptedException;
  • join方法: 这是一个实例方法,在当前线程中对一个线程对象调用join方法会导致当前线程停止运行,等那个线程运行完毕后再接着运行当前线程。也就是说,把当前线程还没执行的部分“接到”另一个线程后面去,另一个线程运行完毕后,当前线程再接着运行。join方法有以下重载版本:
1
2
3
public final synchronized void join() throws InterruptedException;
public final synchronized void join(long millis) throws InterruptedException;
public final synchronized void join(long millis, int nanos) throws InterruptedException;

无参数的join表示当前线程一直等到另一个线程运行完毕,这种情况下当前线程会处于Wating状态;带参数的表示当前线程只等待指定的时间,这种情况下当前线程会处于Time Waiting状态。当前线程通过调用join方法进入Time Waiting或Waiting状态后,会释放已经获取的锁。实际上,join方法内部调用了Object类的实例方法wait,关于这个方法我们下面会具体介绍。

  • yield方法,这是一个静态方法,作用是让当前线程“让步”,目的是为了让优先级不低于当前线程的线程有机会运行,这个方法不会释放锁。
  • interrupt方法,这是一个实例方法。每个线程都有一个中断状态标识,这个方法的作用就是将相应线程的中断状态标记为true,这样相应的线程调用isInterrupted方法就会返回true。通过使用这个方法,能够终止那些通过调用可中断方法进入阻塞状态的线程。常见的可中断方法有sleep、wait、join,这些方法的内部实现会时不时的检查当前线程的中断状态,若为true会立刻抛出一个InterruptedException异常,从而终止当前线程。

    以下这幅图很好的诠释了随着各种方法的调用,线程在不同的状态之间的切换(图片来源:http://www.cnblogs.com/dolphin0520/p/3920357.html):

    Java中的多线程技术全面详解

wait方法与notify/notifyAll方法

wait方法

wait方法是Object类中定义的实例方法。在指定对象上调用wait方法能够让当前线程进入阻塞状态(前提时当前线程持有该对象的内部锁(monitor)),此时当前线程会释放已经获取的那个对象的内部锁,这样一来其他线程就可以获取这个对象的内部锁了。当其他线程获取了这个对象的内部锁,进行了一些操作后可以调用notify方法来唤醒正在等待该对象的线程。

notify/notifyAll方法

notify/notifyAll方法也是Object类中定义的实例方法。它俩的作用是唤醒正在等待相应对象的线程,区别在于前者唤醒一个等待该对象的线程,而后者唤醒所有等待该对象的线程。这么说比较抽象,下面我们来举一个具体的例子来说明以下wait和notify/notifyAll的用法。请看以下代码(转自[Java并发编程:线程间协作的两种方式]

1 public class Test {
2   private int queueSize = 10;
3   private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
4 
5   public static void main(String[] args) {
6    Test test = new Test();
7    Producer producer = test.new Producer();
8    Consumer consumer = test.new Consumer();
9
10   producer.start();
11   consumer.start();
12  }
13
14  class Consumer extends Thread{
15
16   @Override
17   public void run() {
18     consume();
19   }
20
21   private void consume() {
22     while(true){
23       synchronized (queue) {
24         while(queue.size() == 0){
25           try {
26             System.out.println("队列空,等待数据");
27             queue.wait();
28           } catch (InterruptedException e) {
29             e.printStackTrace();
30             queue.notify();
31           }
32         }
33         queue.poll(); //每次移走队首元素
34         queue.notify();
35         System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
36       }
37     }
38   }
39  }
40
41  class Producer extends Thread{
42
43   @Override
44   public void run() {
45     produce();
46   }
47
48   private void produce() {
49     while(true){
50       synchronized (queue) {
51         while(queue.size() == queueSize){
52           try {
53             System.out.println("队列满,等待有空余空间");
54             queue.wait();
55           } catch (InterruptedException e) {
56             e.printStackTrace();
57             queue.notify();
58           }
59         }
60         queue.offer(1); //每次插入一个元素
61         queue.notify();
62         System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
63      }
64    }
65   }
66  }
67 }

以上代码描述的是经典的“生产者-消费者”问题。Consumer类代表消费者,Producer类代表生产者。在生产者进行生产之前(对应第48行的produce方法),会获取queue的内部锁(monitor)。然后判断队列是否已满,若满了则无法再生产,所以在第54行调用queue.wait方法,从而等待在queue对象上。(释放了queue的内部锁)此时生产者能够能够获取queue的monitor从而进入第21行的consume方法,这样一来它就会通过第33行的queue.poll方法进行消费,于是队列不再满了,接着它在第34行调用queue.notify方法来通知正在等待的生产者,生产者就会从刚才阻塞的wait方法(第54行)中返回。
同理,当队列空时,消费者也会等待(第27行)生产者来唤醒(第61行)。
await方法和signal/signalAll方法是wait方法和notify/notifyAll方法的升级版,在后文中会具体介绍它们与wait、notify/notifyAll之间的关系。

如何保证线程安全

所谓线程安全,指的是当多个线程并发访问数据对象时,不会造成对数据对象的“破坏”。保证线程安全的一个基本思路就是让访问同一个数据对象的多个线程进行“排队”,一个接一个的来,这样就不会对数据造成破坏,但带来的代价是降低了并发性。

race condition(竟争条件)

当两个或两个以上的线程同时修改同一数据对象时,可能会产生不正确的结果,我们称这个时候存在一个竞争条件(race condition)。在多线程程序中,我们必须要充分考虑到多个线程同时访问一个数据时可能出现的各种情况,确保对数据进行同步存取,以防止错误结果的产生。请考虑以下代码:

1
2
3
4
5
6
public class Counter {
  private long count = 0;
  public void add(long value) {
    this.count = this.count + value;
  }
}

我们注意一下改变count值的那一行,通常这个操作不是一步完成的,它大概分为以下三步:

  • 第一步,把count的值加载到寄存器中;
  • 第二步,把相应寄存器的值加上value的值;
  • 第三步,把寄存器的值写回count变量。

我们可以编译以上代码然后用javap查看下编译器为我们生成的字节码:

Java中的多线程技术全面详解

我们可以看到,大致过程和我们以上描述的基本一样。那么我们考虑下面这样一个场景:假设count的初值为0,首先线程A加载了count到寄存器中,并且加上了1,而就当它要写回之前,线程B进入了add方法,它加载了count到寄存器中(由于此时线程A还没有把count写回,因此count还是0),并加上了2,然后线程B写回了count。在线程B完成了写回后,线程调度程序调度了线程A,线程A也写回了count。注意,此时count的值为1而不是我们希望的三。我们不希望一个线程在执行add方法时被其他线程打断,因为这会造成数据的破坏。我们希望的情况是这样的:线程A完整执行完毕add方法后,待count变量的值更新为1时,线程B开始执行add方法,在线程B完整执行完毕之前, 没有别的线程能够打断它,若有别的线程想调用add,也得等线程B执行完毕写回count值后。

像add这种方法代码所在的内存区,我们称之为临界区(critical area)。对于临界区,在同一时刻我们只希望有一个线程能够访问它,我们希望在一个线程进入临界区后把通往这个区的门“上锁”,离开后把门"解锁“,这样当一个线程执行临界区的代码时其他想要进来的线程只能在门外等着,这样可以保证了多个线程共享的数据不会被破坏。下面我们来介绍下为临界区“上锁”的方法。

锁对象

Java类库中为我们提供了能够给临界区“上锁”的ReentrantLock类,它实现了Lock接口,在进一步介绍ReentrantLock类之前,我们先来看一下Lock接口的定义:

1
2
3
4
5
6
7
8
public interface Lock {
  void lock();
  void lockInterruptibly() throws InterruptedException;
  boolean tryLock();
  boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  void unlock();
  Condition newCondition();
}

我们来分别介绍下Lock接口中的方法:

  • lock方法用来获取锁,在锁被占用时它会一直阻塞,并且这个方法不能被中断;
  • lockInterruptibly方法在获取不到锁时也会阻塞,它与lock方法的区别在于阻塞在该方法时可以被中断;
  • tryLock方法也是用来获取锁的,它的无参版本在获取不到锁时会立刻返回false,它的计时等待版本会在等待指定时间还获取不到锁时返回false,计时等待的tryLock在阻塞期间也能够被中断。使用tryLock方法的典型代码如下:
1
2
3
4
5
6
7
8
9
if (myLock.tryLock()) {
    try {
      
    } finally {
      myLock.unlock();
    }
} else {
  //做其他的工作
}
  • unlock方法用来释放锁;
  • newCondition方法用来获取当前锁对象相关的条件对象,这个在下文我们会具体介绍。

ReentrantLock类是唯一一个Lock接口的实现类,它的意思是可重入锁,关于“可重入”的概念我们下面会进行介绍。有了上面的介绍,理解它的使用方法就很简单了,比如下面的代码即完成了给add方法“上锁”:

1
2
3
4
5
6
7
8
9
Lock myLock = new ReentrantLock();
public void add(long value) {
  myLock.lock();
  try {
    this.count = this.count + value;
  } finally {
    myLock.unlock();
  }
}

从以上代码可以看到,使用ReentrantLock对象来上锁时只需要先获取一个它的实例。然后通过lock方法进行上锁,通过unlock方法进行解锁。注意,我们使用了一个try-finally块,以确保即使发生异常也总是会解锁,不然其他线程会一直无法执行add方法。当一个线程执行完“myLock.lock()”时,它就获得了一个锁对象,这就相当于它给临界区上了锁,其他线程都无法进来,只有这个线程执行完“myLock.unlock()"时,释放了锁对象,其他线程才能再通过“myLock.lock()"获得锁对象,从而进入临界区。也就是说,当一个线程获取了锁对象后,其他尝试获取锁对象的线程都会被阻塞,进入Blocked状态,直至获取锁对象的线程释放了锁对象。
有了锁对象,尽管线程A在执行add方法的过程中被线程调度程序剥夺了运行权,其他的线程也进入不了临界区,因为线程A还在持有锁对象。这样一来,我们就很好的保护了临界区。
ReentrantLock锁是可重入的,这意味着线程可以重复获得已经持有的锁,每个锁对象内部都持有一个计数,每当线程获取依次锁对象,这个计数就加1,释放一次就减1。只有当计数值变为0时,才意味着这个线程释放了锁对象,这时其他线程才可以来获取。

条件对象

有些时候,线程进入临界区后不能立即执行,它需要等某一条件满足后才开始执行。比如,我们希望count值大于5的时候才增加它的值,我们最先想到的是加个条件判断:

1
2
3
4
5
public void add(int value) {
  if (this.count > 5) {
    this.count = this.count + value;
  }
}
然而上面的代码存在一个问题。假设线程A执行完了条件判断并的值count值大于5,而在此时该线程被线程调度程序中断执行,转而调度线程B,线程B对同一counter对象的count值进行了修改,使得它不再大于5,这时线程调度程序又来调度线程A,线程A刚才判定了条件为真,所以会执行add方法,尽管此时count值已不再大于5。显然,这与我们所希望的情况的不符的。对于这种问题,我们想到了可以在条件判断前后加锁与解锁:
1
2
3
4
5
6
7
8
9
10
11
public void add(int value) {
  myLock.lock();
  try {
    while (counter.getCount() <= 5) {
      //等待直到大于5
    }
    this.count = this.count + value;
  } finally {
    myLock.unlock();
  }
}
在以上代码中,若线程A发现count值小于等于5,它会一直等到别的线程增加它的值直到它大于5。然而线程A此时持有锁对象,其他线程无法进入临界区(add方法内部)来改变count的值,所以当线程A进入临界区时若count小于等于5,线程A会一直在循环中等待,其他的线程也无法进入临界区。这种情况下,我们可以使用条件对象来管理那些已经获得了一个锁却不能开始干活的线程。一个锁对象可以有一个或多个相关的条件对象,在锁对象上调用newCondition方法就可以获得一个条件对象。比如我们可以为“count值大于5”获得一个条件对象:
1
Condition enoughCount = myLock.newCondition();

然后,线程A发现count值不够时,调用“enoughCount.await()”即可,这时它便会进入Waiting状态,放弃它持有的锁对象,以便其他线程能够进入临界区。当线程B进入临界区修改了count值后,发现了count值大于5,线程B可通过"enoughCount.signalAll()"来“唤醒所有等待这一条件满足的线程(这里只有线程A)。此时线程A会从Waiting状态进入Runnable状态。当线程A再次被调度时,它便会从await方法返回,重新获得锁并接着刚才继续执行。注意,此时线程A会再次测试条件是否满足,若满足则执行相应操作。也就是说signalAll方法仅仅是通知线程A一声count的值可能大于5了,应该再测试一下。还有一个signal方法,会随机唤醒一个正在等待某条件的线程,这个方法的风险在于若随机唤醒的线程测试条件后发现仍然不满足,它还是会再次进入Waiting状态,若以后不再有线程唤醒它,它便不能再运行了。

synchronized关键字

Java中的每个对象都有一个内部锁,这个内部锁也被称为监视器(monitor);每个类内部也有一个锁,用于控制多个线程对其静态成员的并发访问。若一个实例方法用synchronized关键字修饰,那么这个对象的内部锁会“保护”此方法,我们称此方法为同步方法。这意味着只有获取了该对象内部锁的线程才能够执行此方法。也就是说,以下的代码:

1
2
3
public synchronized void add(int value) {
  ...
}

等价于:

1
2
3
4
5
6
7
8
public void add(int value) {
  this.innerLock.lock();
  try {
    ...
  } finally {
    this.innerLock.unlock();
  }
}

这意味着,我们通过给add方法加上synchronized关键字即可保护它,加锁解锁的工作不需要我们再手动完成。对象的内部锁在同一时刻只能由一个线程持有,其他尝试获取的线程都会被阻塞直至该线程释放锁,这种情况下被阻塞的线程无法被中断

内部锁对象只有一个相关条件。wait方法添加一个线程到这个条件的等待集中notifyAll / notify方法会唤醒等待集中的线程。也就是说wait() / notify()等价于enoughCount.await() / enoughCount.signAll()。以上add方法我们可以这么实现:

1
2
3
4
5
6
7
public synchronized void add(int value) {
  while (this.count <= 5) {
    wait();
  }
  this.count += value;
  notifyAll();
}

这份代码显然比我们上面的实现要简洁得多,实际开发中也更加常用。

我们也可以用synchronized关键字修饰静态方法,这样的话,进入该方法的线程或获取相关类的Class对象的内部锁。例如,若Counter中含有一个synchronized关键字修饰的静态方法,那么进入该方法的线程会获得Bank.class的内部锁。这意味着其他任何线程不能执行Counter类的任何同步静态方法。

对象内部锁存在一些局限性:

  • 不能中断一个正在试图获取锁的线程;
  • 试图获取锁时不能设定超时;
  • 每个锁仅有一个相关条件。

那么我们究竟应该使用Lock/Condition还是synchronized关键字呢?答案是能不用尽量都不用,我们应尽可能使用java.util.concurrent包中提供给我们的相应机制(后面会介绍)。

当我们要在synchronized关键字与Lock间做出选择时我们需要考虑以下几点:

  • 若我们需要多个线程进行读操作,应该使用实现了Lock接口的ReentrantReadWriteLock类,这个类允许多个线程同时读一个数据对象(这个类的使用后面会介绍);
  • 当我们需要Lock/Condition的特性时,应该考虑使用它(比如多个条件还有计时等待版本的await函数);
  • 一般场景我们可以考虑使用synchronized关键字,因为它的简洁性一定程度上能够减少出错的可能。关于synchronized关键字需要注意的一点是:synchronized方法或者synchronized代码块出现异常时,Java虚拟机会自动释放当前线程已获取的锁。
同步阻塞

上面我们提到了一个线程调用synchronized方法可以获得对象的内部锁(前提是还未被其他线程获取),获得对象内部锁的另一种方法就是通过同步阻塞:

1
2
3
synchronized (obj) {
  //临界区
}

一个线程执行上面的代码块便可以获取obj对象的内部锁,直至它离开这个代码块才会释放锁。

我们经常会看到一种特殊的锁,如下所示:

1
2
3
4
5
6
7
public class Counter {
  private Object lock = new Object();
  synchronized (lock) {
    //临界区
  }
  ...
}

那么这种使用这种锁有什么好处呢?我们知道Counter对象只有一个内部锁,这个内部锁在同一时刻只能被一个对象持有,那么设想Counter对象中定义了两个synchronized方法。在某一时刻,线程A进入了其中一个synchronized方法并获取了内部锁,此时线程B尝试进去另一个synchronized方法时由于对象内部锁还没有被线程A释放,因此线程B只能被阻塞。然而我们的两个synchronized方法是两个不同的临界区,它们不会相互影响,所以它们可以在同一时刻被不同的线程所执行。这时我们就可以使用如上面所示的显式的锁对象,它允许不同的方法同步在不同的锁上。

volatile域

有时候,仅仅为了同步一两个实例域就使用synchronized关键字或是Lock/Condition,会造成很多不必要的开销。这时候我们可以使用volatile关键字,使用volatile关键字修饰一个实例域会告诉编译器和虚拟机这个域可能会被多线程并发访问,这样编译器和虚拟机就能确保它的值总是我们所期望的。
volatile关键字的实现原理大致是这样的:我们在访问内存中的变量时,通常都会把它缓存在寄存器中,以后再需要读它的值时,只需从相应寄存器中读取,若要对该变量进行写操作,则直接写相应寄存器,最后写回该变量所在的内存单元。若线程A把count变量的值缓存在寄存器中,并将count加2(将相应寄存器的值加2),这时线程B被调度,它读取count变量加2后并写回。然后线程A又被调度,它会接着刚才的操作,也就是会把count值写回,此时线程A是直接把寄存器中的值写回count所在单元,而这个值是过期的。若count被volatile关键字修饰,这个问题便可被圆满解决。volatile变量有一个性质,就是任何时候读取它的值时,都会直接去相应内存单元读取,而不是读取缓存在寄存器中的值。这样一来,在上面那个场景中,线程A把count写回时,会从内存中读取count最新的值,从而确保了count的值总是我们所期望的。
关于volatile关键字更加详细的论述请参考这里:Java并发编程:volatile关键字解析 ,感谢海子同我们分享了这篇精彩博文:)

死锁

假设现在进程中只有线程A和线程B这两个线程,考虑下面这样一种情形:
线程A获取了counterA对象的内部锁,线程B获取了counterB对象的内部锁。而线程A只有在获取counterB的内部锁后才能继续执行,线程B只有在获取线程A的内部锁后才能继续执行。这样一来,两个线程在互相等待对方释放锁从而谁也没法继续执行,这种现象就叫做死锁(deadlock)

除了以上情况,还有一种类似的死锁情况是两个线程获取锁后都不满足条件从而进入条件的等待集中,相互等待对方唤醒自己。

Java没有为解决死锁提供内在机制,因此我们只有在开发时格外小心,以避免死锁的发生。关于分析定位程序中的死锁,大家可以参考这篇文章:Java Deadlock Example and How to analyze deadlock situation

读/写锁

若很多线程从一个内存区域读取数据,但其中只有极少的一部分线程会对其中的数据进行修改,此时我们希望所有Reader线程共享数据,而所有Writer线程对数据的访问要互斥。我们可以使用读/写锁来达到这一目的。
Java中的读/写锁对应着ReentrantReadWriteLock类,它实现了ReadWriteLock接口,这个接口的定义如下:

1
2
3
4
5
6
7
8
9
10
public interface ReadWriteLock {
  /** * Returns the lock used for reading.
    * * @return the lock used for reading
    */
  Lock readLock();
  /** * Returns the lock used for writing.
    * * @return the lock used for writing
    */
  Lock writeLock();
}

我们可以看到这个接口就定义了两个方法,其中readLock方法用来获取一个“读锁”,writeLock方法用来获取一个“写锁”。

ReentrantReadWriteLock类的使用步骤通常如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//构造一个ReentrantReadWriteLock对象
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
//分别从中“提取”读锁和写锁
private Lock readLock = rwl.readLock();
private Lock writeLock = rwl.writeLock();
//对所有的Reader线程加读锁
readLock.lock();
try {
  //读操作可并发,但写操作会互斥
} finally {
  readLock.unlock();
}
//对所有的Writer线程加写锁
writeLock.lock();
try {
  //排斥所有其他线程的读和写操作
} finally {
  writeLock.unlock();
}

在使用ReentrantReadWriteLock类时,我们需要注意以下两点:

  • 若当前已经有线程占用了读锁,其他要申请写锁的线程需要占用读锁的线程释放了读锁才能申请成功;
  • 若当前已经有线程占用了写锁,其他要申请读锁或写锁的线程都需要等待占用写锁的线程释放了写锁才能申请成功。

阻塞队列

以上我们所介绍的都属于Java并发机制的底层基础设施。在实际编程我们应该尽量避免使用以上介绍的较为底层的机制,而使用Java类库中提供给我们封装好的较高层次的抽象。对于许多同步问题,我们可以通过使用一个或多个队列来解决:生产者线程向队列中插入元素,消费者线程则取出他们。考虑一下我们最开始提到的Counter类,我们可以通过队列来这样解决它的同步问题:增加计数值的线程不能直接访问Counter对象,而是把add指令对象插入到队列中,然后由另一个可访问Counter对象的线程从队列中取出add指令对象并执行add操作(只有这个线程能访问Counter对象,因此无需采取额外措施来同步)。
当试图向满队列中添加元素或者向空队列中移除元素时,阻塞队列(blocking queue)会导致线程阻塞。通过阻塞队列,我们可以按以下模式来工作:工作者线程可以周期性的将中间结果放入阻塞队列中,其他线程可取出中间结果并进行进一步操作。若前者工作的比较慢(还没来得及向队列中插入元素),后者会等待它(试图从空队列中取元素从而阻塞);若前者运行的快(试图向满队列中插元素),它会等待其他线程。阻塞队列提供了以下方法:

  • add方法:添加一个元素。若队列已满,会抛出IllegalStateException异常。
  • element方法:返回队列的头元素。若队列为空,会抛出NoSuchElementException异常。
  • offer方法:添加一个元素,若成功则返回true。若队列已满,则返回false。
  • peek方法:返回队列的头元素。若队列为空,则返回null。
  • poll方法:删除并返回队列的头元素。若队列为空,则返回null。
  • put方法:添加一个元素。若队列已满,则阻塞。
  • remove方法:移除并返回头元素。若队列为空,会抛出NoSuchElementException。
  • take方法:移除并返回头元素。若队列为空,则阻塞。

java.util.concurrent包提供了以下几种阻塞队列:

  • LinkedBlockingQueue是一个基于链表实现的阻塞队列。默认容量没有上限,但也有可以指定最大容量的构造方法。它有的“双端队列版本”为LinkedBlockingDeque。
  • ArrayBlockingQueue是一个基于数组实现的阻塞队列,它在构造时需要指定容量。它还有一个构造方法可以指定一个公平性参数,若这个参数为true,那么等待了最长时间的线程会得到优先处理(指定公平性参数会降低性能)。
  • PriorityBlockingQueue是一个基于堆实现的带优先级的阻塞队列。元素会按照它们的优先级被移除队列。

下面我们来看一个使用阻塞队列的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class BlockingQueueTest {
  private int size = 20;
  private ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(size);
  public static void main(String[] args) {
    BlockingQueueTest test = new BlockingQueueTest();
    Producer producer = test.new Producer();
    Consumer consumer = test.new Consumer();
    producer.start();
    consumer.start();
  }
  class Consumer extends Thread{
    @Override
    public void run() {
      while(true){
        try {
          //从阻塞队列中取出一个元素
          queue.take();
          System.out.println("队列剩余" + queue.size() + "个元素");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }
 
  class Producer extends Thread{
    @Override public void run() {
      while (true) {
        try {
          //向阻塞队列中插入一个元素
          queue.put(1);
          System.out.println("队列剩余空间:" + (size - queue.size()));
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

在以上代码中,我们有一个生产者线程不断地向一个阻塞队列中插入元素,同时消费者线程从这个队列中取出元素。若生产者生产的比较快,消费者取的比较慢导致队列满,此时生产者再尝试插入时就会阻塞在put方法中,直到消费者取出一个元素;反过来,若消费者消费的比较快,生产者生产的比较慢导致队列空,此时消费者尝试从中取出时就会阻塞在take方法中,直到生产者插入一个元素。

执行器

创建一个新线程涉及和操作系统的交互,因此会产生一定的开销。在有些应用场景下,我们会在程序中创建大量生命周期很短的线程,这时我们应该使用线程池(thread pool)。通常,一个线程池中包含一些准备运行的空闲线程,每次将Runnable对象交给线程池,就会有一个线程执行run方法。当run方法执行完毕时,线程不会进入Terminated状态,而是在线程池中准备等下一个Runnable到来时提供服务。使用线程池统一管理线程可以减少并发线程的数目,线程数过多往往会在线程上下文切换上以及同步操作上浪费过多时间。

执行器类(java.util.concurrent.Executors)提供了许多静态工厂方法来构建线程池。

线程池

在Java中,线程池通常指一个ThreadPoolExecutor对象,ThreadPoolExecutor类继承了AbstractExecutorService类,而AbstractExecutorService抽象类实现了ExecutorService接口,ExecutorService接口又扩展了Executor接口。也就是说,Executor接口是Java中实现线程池的最基本接口。我们在使用线程池时通常不直接调用ThreadPoolExecutor类的构造方法,二回使用Executors类提供给我们的静态工厂方法,这些静态工厂方法内部会调用ThreadPoolExecutor的构造方法,并为我们准备好相应的构造参数。

Executors类中的以下三个方法会返回一个实现了ExecutorService接口的ThreadPoolExecutor类的对象:

1
2
3
4
5
6
//返回一个带缓存的线程池,该池在必要的时候创建线程,在线程空闲60s后终止线程
ExecutorService newCachedThreadPool()
//返回一个线程池,线程数目由threads参数指明
ExecutorService newFixedThreadPool(int threads) ExecutorService
//返回只含一个线程的线程池,它在一个单一的线程中依次执行各个任务
ExecutorService  newSingleThreadExecutor()

对于newCachedThreadPool方法返回的线程池:对每个任务,若有空闲线程可用,则立即让它执行任务;若没有可用的空闲线程,它就会创建一个新线程并加入线程池中;
newFixedThreadPool方法返回的线程池里的线程数目由创建时指定,并一直保持不变。若提交给它的任务多于线程池中的空闲线程数目,那么就会把任务放到队列中,当其他任务执行完毕后再来执行它们;
newSingleThreadExecutor会返回一个大小为1的线程池,由一个线程执行提交的任务。

以下方法可将一个Runnable对象或Callable对象提交给线程池:

1
2
3
Future<T> submit(Callable<T> task)
Future<T> submit(Runnable task, T result)
Future<?> submit(Runnable task)

调用submit方法会返回一个Future对象,可通过这个对象查询该任务的状态。我们可以在这个Future对象上调用isDone、cancle、isCanceled等方法(Future接口会在下面进行介绍)。第一个submit方法提交一个Callable对象到线程池中;第二个方法提交一个Runnable对象,并且Future的get方法在完成的时候返回指定的result对象。

当我们使用完线程池时,就调用shutdown方法,该方法会启动该线程池的关闭例程。被关闭的线程池不能再接受新的任务,当关闭前已存在的任务执行完毕后,线程池死亡。shutdownNow方法可以取消线程池中尚未开始的任务并尝试中断所有线程池中正在运行的线程。

在使用线程池时,我们通常应该按照以下步骤来进行:

  • 调用Executors中相关方法构建一个线程池;
  • 调用submit方法提交一个Runnable对象或Callable对象到线程池中;
  • 若想要取消一个任务,需要保存submit返回的Future对象;
  • 当不再提交任何任务时,调用shutdown方法。

关于线程池更加深入及详细的分析,大家可以参考这篇博文:http://www.cnblogs.com/dolphin0520/p/3932921.html

预定执行

ScheduledExecutorService接口含有为预定执行(Scheduled Execution)或重复执行的任务专门设计的方法。Executors类的newScheduledThreadPool和newSingleThreadScheduledExecutor方法会返回实现了ScheduledExecutorService接口的对象。可以使用以下方法来预定执行的任务:

1
2
3
4
5
6
7
//以下两个方法预定在指定时间过后执行任务
ScheduledFuture<V> schedule(Callable<V> task, long time, TimeUnit unit)
ScheduledFuture<?> schedule(Runnable task, long time, TimeUnit unit)
//在指定的延迟(initialDelay)过后,周期性地执行给定任务
SchedukedFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit)
//在指定延迟(initialDelay)过后周期性的执行任务,每两个任务间的间隔为delay指定的时间
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)
控制任务组

对ExecutorService对象调用invokeAny方法可以把一个Callable对象集合提交到相应的线程池中执行,并返回某个已经完成的任务的结果,该方法的定义如下:

1
2
T invokeAny(Collection<Callable<T>> tasks)
T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
该方法可以指定一个超时参数。这个方法的不足在于我们无法知道它返回的结果是哪个任务执行的结果。如果集合中的任意Callable对象的执行结果都能满足我们的需求的话,使用invokeAny方法是很好的。
invokeAll方法也会提交Callable对象集合到相应的线程池中,并返回一个Future对象列表,代表所有任务的解决方案。该方法的定义如下:
1
2
List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)

Callable与Future

我们之前提到了创建线程的两种方式,它们有一个共同的缺点,那就是异步方法run没有返回值,也就是说我们无法直接获取它的执行结果,只能通过共享变量或者线程间通信等方式来获取。好消息是通过使用Callable和Future,我们可以方便的获得线程的执行结果。
Callable接口与Runnable接口类似,区别在于它定义的异步方法call有返回值。Callable接口的定义如下:

1
2
3
public interface Callable<V> {
  V call() throws Exception;
}

类型参数V即为异步方法call的返回值类型。

Future可以对具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成以及获取结果。可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。Future接口的定义如下:

1
2
3
4
5
6
7
public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isCancelled();
  boolean isDone();
  V get() throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中声明了5个方法,每个方法的作用如下:

  • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false(即如果取消已经完成的任务会返回false);如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
    isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone方法表示任务是否已经完成,若任务完成,则返回true;
  • get()方法用来获取执行结果,这个方法会阻塞,一直等到任务执行完才返回;
  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

也就是说Future提供了三种功能

  • 判断任务是否完成;
  • 能够中断任务;
  • 能够获取任务执行结果。

Future接口的实现类是FutureTask:

1
public class FutureTask<V> implements RunnableFuture<V>

FutureTask类实现了RunnableFuture接口,这个接口的定义如下:

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
  void run();
}

可以看到RunnableFuture接口扩展了Runnable接口和Future接口。 FutureTask类有如下两个构造器:

1
2
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)

FutureTask通常与线程池配合使用,通常会创建一个包装了Callable对象的FutureTask实例,并用submit方法将它提交到一个线程池去执行,我们可以通过FutureTask的get方法获取返回结果。

同步容器与并发容器

同步容器

Java中的同步容器指的是线程安全的集合类,同步容器主要包含以下两类:
通过Collections类中的相应方法把普通容器类包装成线程安全的版本;
Vector、HashTable等系统为我们封装好的线程安全的集合类。

相比与并发容器(下面会介绍),同步容器存在以下缺点:

  • 对于并发读访问的支持不够好;
  • 由于内部多采用synchronized关键字实现,所以性能上不如并发容器;
    对同步容器进行迭代的同时修改它的内容,会报ConcurrentModificationException异常。

关于同步容器更加详细的介绍请参考这里:http://www.cnblogs.com/dolphin0520/p/3933404.html

并发容器

并发容器相比于同步容器,具有更强的并发访问支持,主要体现在以下方面:

  • 在迭代并发容器时修改其内容并不会抛出ConcurrentModificationException异常;
  • 在并发容器的内部实现中尽量避免了使用synchronized关键字,从而增强了并发性。

Java在java.util.concurrent包中提供了主要以下并发容器类:

  • ConcurrentHashMap,这个并发容器是为了取代同步的HashMap;

  • CopyOnWriteArrayList,使用这个类在迭代时进行修改不抛异常;

  • ConcurrentLinkedQuerue是一个非阻塞队列;

  • ConcurrentSkipListMap用于在并发环境下替代SortedMap;

  • ConcurrentSkipSetMap用于在并发环境下替代SortedSet。

    关于这些类的具体使用,大家可以参考官方文档及相关博文。通常来说,并发容器的内部实现做到了并发读取不用加锁,并发写时加锁的粒度尽可能小。

同步器(Synchronizer)

java.util.concurrent包提供了几个帮助我们管理相互合作的线程集的类,这些类的主要功能和适用场景如下:

  • CyclicBarrier:它允许线程集等待直至其中预定数目的线程到达某个状态(这个状态叫公共障栅(barrier)),然后可以选择执行一个处理障栅的动作。适用场景:当多个线程都完成某操作,这些线程才能继续执行时,或都完成了某操作后才能执行指定任务时。对CyclicBarrier对象调用await方法即可让相应线程进入barrier状态,等到预定数目的线程都进入了barrier状态后,这些线程就可以继续往下执行了
  • CountDownLatch:允许线程集等待直到计数器减为0。适用场景:当一个或多个线程需要等待直到指定数目的事件发生。举例来说,假如主线程需要等待N个子线程执行完毕才继续执行,就可以使用CountDownLatch来实现,需要用到CountDownLatch的以下方法:
1
2
3
4
5
6
//调用该方法的线程会进入阻塞状态,直到count值为0才继续执行
1 public void await() throws InterruptedException { };
 //await方法的计时等待版本
2 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//将CountDownLatch对象count值(初始化时作为参数传入构造方法)减1
3 public void countDown() { };
  • Exchanger:允许两个线程在要交换的对象准备好时交换对象。适用场景:当两个线程工作在同一数据结构的两个实例上时,一个向实例中添加数据,另一个从实例中移除数据。
  • Semaphore:允许线程集等待直到被允许继续运行为止。适用场景:限制同一时刻对某一资源并发访问的线程数,初始化Semaphore需要指定许可的数目,线程要访问受限资源时需要获取一个许可,当所有许可都被获取,其他线程就只有等待许可被释放后才能获取。
  • SynchronousQueue:允许一个线程把对象交给另一个线程。适用场景:在没有显式同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个线程。

关于CountDownLatch、CyclicBarrier、Semaphore的具体介绍和使用示例大家可以参考这篇博文:Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

 
我有一个微信公众号,经常会分享一些Java技术相关的干货;如果你喜欢我的分享,可以用微信搜索“Java团长”或者“javatuanzhang”关注。

参考:

上一篇:多线程高级篇1 — JUC — 只弄到处理高并发集合问题


下一篇:Java中使用CountDownLatch进行多线程同步