20210614. 并发编程 - 拉勾教育

并发编程

多线程&并发设计原理

多线程回顾

ThreadRunnable

创建执行线程有两种方法:

  • 扩展 Thread
  • 实现 Runnable 接口
Java 中的线程:特征和状态
  • 所有的 Java 程序,不论并发与否,都有一个名为主线程的 Thread 对象。执行该程序时, Java 虚拟机( JVM )将创建一个新 Thread 并在该线程中执行 main() 方法。这是非并发应用程序中唯一的线程,也是并发应用程序中的第一个线程

  • Java 中的线程共享应用程序中的所有资源,包括内存和打开的文件,快速而简单地共享信息。但是必须使用同步 避免数据竞争

  • Java 中的所有线程都有一个优先级,这个整数值介于 Thread.MIN_PRIORITY ( 1 )和 Thread.MAX_PRIORITY ( 10 )之间,默认优先级是 Thread.NORM_PRIORITY ( 5 )。线程的执行顺序并没有保证,通常,较高优先级的线程将在较低优先级的线程之前执行。

  • 在 Java 中,可以创建两种线程:

    • 守护线程
    • 非守护线程

    区别在于它们如何影响程序的结束

    Java 程序结束执行过程的情形:

    • 程序执行 Runtime 类的 exit() 方法, 而且用户有权执行该方法。
    • 应用程序的所有非守护线程均已结束执行,无论是否有 正在运行的守护线程

    守护线程通常用在作为垃圾收集器或缓存管理器的应用程序中,执行辅助任务。在线程 start 之前调用 isDaemon() 方法检查线程是否为守护线程,也可以使用 setDaemon() 方法将某个线程确立为守护线程。

  • Thread.States 类中定义线程的状态如下:

    • NEWThread 对象已经创建,但是还没有开始执行。
    • RUNNABLEThread 对象正在 Java 虚拟机中运行。
    • BLOCKED : Thread 对象正在等待锁定。
    • WAITINGThread 对象正在等待另一个线程的动作。
    • TIME_WAITINGThread 对象正在等待另一个线程的操作,但是有时间限制。
    • TERMINATEDThread 对象已经完成了执行。

    getState() 方法获取 Thread 对象的状态。

    在给定时间内, 线程只能处于一个状态。这些状态是 JVM 使用的状态,不能映射到操作系统的线程状态

Thread 类和 Runnable 接口

Runnable 接口只定义了一种方法: run() 方法。这是每个线程的主方法。当执行 Thread.start() 方法启动新线程时,它将调用 run() 方法。

Thread 类其他常用方法:

  • 获取和设置 Thread 对象信息的方法:
    • getId() :该方法返回Thread对象的标识符。该标识符是在钱程创建时分配的一个正整数。在线程的整个生命周期中是唯一且无法改变的。
    • getName() / setName() :这两种方法允许你获取或设置 Thread 对象的名称。这个名称是一个 String 对象,也可以在 Thread 类的构造函数中建立。
    • getPriority() / setPriority() :你可以使用这两种方法来获取或设置 Thread 对象的优先级。
    • isDaemon() / setDaemon() :这两种方法允许你获取或建立 Thread 对象的守护条件。
    • getState() :该方法返回 Thread 对象的状态。
  • interrupt() :中断目标线程,给目标线程发送一个中断信号,线程被打上中断标记。
  • interrupted() :判断目标线程是否被中断,但是将清除线程的中断标记。
  • isinterrupted() :判断目标线程是否被中断,不会清除中断标记。
  • sleep(long ms) :该方法将线程的执行暂停 ms 时间。
  • join() :暂停线程的执行,直到调用该方法的线程执行结束为止。可以使用该方法等待另一个 Thread 对象结束。
  • setUncaughtExceptionHandler() :当线程执行出现未校验异常时,该方法用于建立未校验异常的控制器。
  • currentThread()Thread 类的静态方法,返回实际执行该代码的 Thread 对象
  • yield() :线程让步,让出 CPU 执行时间,用于 测试 多线程并发
Callable

Callable 接口是一个与 Runnable 接口非常相似的接口。Callable 接口的主要特征如下:

  • 接口。有简单类型参数,与 call() 方法的返回类型相对应。
  • 声明了 call() 方法。执行器运行任务时,该方法会被执行器执行。它必须返回声明中指定类型的对象。
  • call() 方法可以抛出任何一种校验异常。可以实现自己的执行器并重载 ThreadPoolExecutor #afterExecute() 方法来处理这些异常。

synchronized 关键字

如果一份资源需要多个线程同时访问,需要给该资源加锁。加锁之后,可以保证同一时间只能有一个线程访问该资源。资源可以是一个变量、一个对象或一个文件等。

锁是一个 “对象” ,作用如下:

  • 这个对象内部得有一个标志位( state 变量),记录自己有没有被某个线程占用。最简单的情况是这个 state01 两个取值, 0 表示没有线程占用这个锁, 1 表示有某个线程占用了这个锁。
  • 如果这个对象被某个线程占用,记录这个线程的 thread ID 。
  • 这个对象维护一个 thread id list ,记录其他所有阻塞的、等待获取拿这个锁的线程。在当前线程释放锁之后从这个 thread id list 里面取一个线程唤醒。

要访问的共享资源本身也是一个对象,这两个对象可以合成一个对象。资源和锁合二为一,使得在 Java 里面, synchronized 关键字可以加在任何对象的成员上面。这意味着,这个对象既是共享资源,同时也具备“锁”的功能!

锁的实现原理:在对象头里,有一块数据叫 Mark Word 。在 64 位机器上, Mark Word 是 8 字节( 64 位)的,这 64 位中有 2 个重要字段:锁标志位和占用该锁的 thread ID 。因为不同版本的 JVM 实现,对象头的数据结构会有各种差异。

wait 与 notify

synchronized 关键字可以加在任何对象的实例方法上面,任何对象都可能成为锁。因此, wait()notify() 只能放在 Object 里面了。

wait() 的内部,会先释放锁 obj1 ,然后进入阻塞状态,之后,它被另外一个线程用 notify() 唤醒,重新获取锁!其次, wait() 调用完成后,执行后面的业务逻辑代码,然后退出 synchronized 同步块,再次释放锁。 wait() 内部的伪代码如下:

wait() {
    // 释放锁
    // 阻塞,等待被其他线程notify
    // 重新获取锁
}

wait()notify() 的问题:生产者在通知消费者的同时,也通知了其他的生产者;消费者在通知生产者的同时,也通知了其他消费者。原因在于 wait()notify() 所作用的对象和 synchronized 所作用的对象是同一个,只能有一个对象,无法区分队列空和列队满两个条件。这正是 Condition 要解决的问题。

InterruptedExceptioninterrupt() 方法

只有那些声明了会抛出 InterruptedException 的函数才会抛出异常

轻量级阻塞与重量级阻塞

能够被中断的阻塞称为轻量级阻塞,对应的线程状态是 WAITING 或者 TIMED_WAITING ;而像 synchronized 这种不能被中断的阻塞称为重量级阻塞,对应的状态是 BLOCKED 。如图所示:调用不同的方法后,一个线程的状态迁移过程。java.lang.Thread.State

20210614. 并发编程 - 拉勾教育

初始线程处于 NEW 状态,调用 start() 开始执行后,进入 RUNNING 或者 READY 状态。如果没有调用任何的阻塞函数,线程只会在 RUNNINGREADY 之间切换,也就是系统的时间片调度。这两种状态的切换是操作系统完成的,除非手动调用 yield() 函数,放弃对 CPU 的占用。

一旦调用了图中的任何阻塞函数,线程就会进入 WAITING 或者 TIMED_WAITING 状态,两者的区别只是前者为无限期阻塞,后者则传入了一个时间参数,阻塞一个有限的时间。如果使用了 synchronized 关键字或者 synchronized 块,则会进入 BLOCKED 状态。

不太常见的阻塞 / 唤醒函数, LockSupport.park() / LockSupport.unpark() 。这对函数非常关键, Concurrent 包中 Lock 的实现即依赖这一对操作原语。

thread.isInterrupted() 与 Thread.interrupted() 的区别

因为 thread.interrupted() 相当于给线程发送了一个唤醒的信号,所以如果线程此时恰好处于 WAITING 或者 TIMED_WAITING 状态,就会抛出一个 InterruptedException ,并且线程被唤醒。而如果线程此时并没有被阻塞,则线程什么都不会做。但在后续,线程可以判断自己是否收到过其他线程发来的中断信号,然后做一些对应的处理。

这两个方法都是线程用来判断自己是否收到过中断信号的,前者是实例方法,后者是静态方法。二者的区别在于,前者只是读取中断状态,不修改状态;后者不仅读取中断状态,还会重置中断标志位。

线程的优雅关闭

stop 与 destory 函数

线程是“一段运行中的代码”,一个运行中的方法。运行到一半的线程能否强制杀死?

不能。在 Java 中,有 stop()destory() 等方法,但这些方法官方明确不建议使用。原因很简单,如果强制杀死线程,则线程中所使用的资源,例如文件描述符、网络连接等无法正常关闭。

因此,一个线程一旦运行起来,不要强行关闭,合理的做法是让其运行完(也就是方法执行完毕),干净地释放掉所有资源,然后退出。如果是一个不断循环运行的线程,就需要用到线程间的通信机制,让主线程通知其退出。

守护线程

当在一个 JVM 进程里面开多个线程时,这些线程被分成两类:守护线程和非守护线程。默认都是非守护线程。

在 Java 中有一个规定:当所有的非守护线程退出后,整个 JVM 进程就会退出。意思就是守护线程“不算作数”,守护线程不影响整个 JVM 进程的退出。

例如,垃圾回收线程就是守护线程,它们在后台默默工作,当开发者的所有前台线程(非守护线程)都退出之后,整个 JVM 进程就退出了。

设置关闭的标志位
public class MyThread extends Thread {

    private boolean flag = true;

    @Override
    public void run() {
        while (flag) {
            System.out.println("线程正在运行。。。");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // 用于关闭线程
    public void stopRunning() {
        this.flag = false;
    }

    public static void main(String[] args) throws InterruptedException {
        MyThread myThread = new MyThread();
        myThread.start();
        Thread.sleep(3000);
        // 置标志位,停止
        myThread.stopRunning();
        myThread.join();
        System.out.println("main线程退出");
    }

}

但上面的代码有一个问题:如果 MyThread twhile 循环中阻塞在某个地方,例如里面调用了 object.wait() 函数,那它可能永远没有机会再执行 while(!stopped) 代码,也就一直无法退出循环。

此时,就要用到 InterruptedExceptioninterrupt() 函数。

并发核心概念

并发与并行

在单个处理器上采用单核执行多个任务即为并发。在这种情况下,操作系统的任务调度程序会很快从一个任务切换到另一个任务,因此看起来所有的任务都是同时运行的。

同一时间内在不同计算机、处理器或处理器核心上同时运行多个任务,就是所谓的“并行”。

另一个关于并发的定义是,在系统上同时运行多个任务(不同的任务)就是并发。而另一个关于并行的定义是:同时在某个数据集的不同部分上运行同一任务的不同实例就是并行。

关于并行的最后一个定义是,系统中同时运行了多个任务。关于并发的最后一个定义是,一种解释程序员将任务和它们对共享资源的访问同步的不同技术和机制的方法。

这两个概念非常相似,而且这种相似性随着多核处理器的发展也在不断增强。

同步

在并发中,我们可以将 同步 定义为一种协调两个或更多任务以获得预期结果的机制。同步的方式有两种:

  • 控制同步:例如,当一个任务的开始依赖于另一个任务的结束时,第二个任务不能在第一个任务完成之前开始。
  • 数据访问同步:当两个或更多任务访问共享变量时,在任意时间里,只有一个任务可以访问该变量。

与同步密切相关的一个概念是 临界段。临界段是一段代码,由于它可以访问共享资源,因此在任何给定时间内,只能被一个任务执行。互斥 是用来保证这一要求的机制,而且可以采用不同的方式来实现。

并发系统中有不同的同步机制。从理论角度看,最流行的机制如下:

  • 信号量( semaphore ):一种用于控制对一个或多个单位资源进行访问的机制。它有一个用于存放可用资源数量的变量,而且可以采用两种原子操作来管理该变量。互斥( mutex , mutual exclusion 的简写形式)是一种特殊类型的信号量,它只能取两个值(即 资源空闲资源忙 ),而且只有将互斥设置为忙的那个进程才可以释放它。互斥可以通过保护临界段来帮助你避免出现竞争条件。
  • 监视器 :一种在共享资源上实现互斥的机制。它有一个互斥、一个条件变量、两种操作(等待条件和通报条件)。一旦你通报了该条件,在等待它的任务中只有一个会继续执行。

如果共享数据的所有用户都受到同步机制的保护,那么代码(或方法、对象)就是 线程安全 的。数据的非阻塞的CAS(compare-and-swap,比较和交换)原语是不可变的,这样就可以在并发应用程序中使用该代码而不会出任何问题。

不可变对象

不可变对象是一种非常特殊的对象。在其初始化后,不能修改其可视状态(其属性值)。如果想修改一个不可变对象,那么你就必须创建一个新的对象。

不可变对象的主要优点在于它是线程安全的。你可以在并发应用程序中使用它而不会出现任何问题。

不可变对象的一个例子就是 Java 中的 String 类。当你给一个 String 对象赋新值时,会创建一个新的 String 对象。

原子操作和原子变量

与应用程序的其他任务相比,原子操作 是一种发生在瞬间的操作。在并发应用程序中,可以通过一个临界段来实现原子操作,以便对整个操作采用同步机制。

原子变量 是一种通过原子操作来设置和获取其值的变量。可以使用某种同步机制来实现一个原子变量,或者也可以使用 CAS 以无锁方式来实现一个原子变量,而这种方式并不需要任何同步机制。

共享内存与消息传递

任务可以通过两种不同的方式来相互通信:

  • 第一种方法是 共享内存 ,通常用于在同一台计算机上运行多任务的情况。任务在读取和写入值的时候使用相同的内存区域。为了避免出现问题,对该共享内存的访问必须在一个由同步机制保护的临界段内完成。
  • 另一种同步机制是 消息传递 ,通常用于在不同计算机上运行多任务的情形。当一个任务需要与另一个任务通信时,它会发送一个遵循预定义协议的消息。如果发送方保持阻塞并等待响应,那么该通信就是同步的;如果发送方在发送消息后继续执行自己的流程,那么该通信就是异步的。

并发的问题

数据竞争

如果有两个或者多个任务在临界段之外对一个共享变量进行写入操作,也就是说没有使用任何同步机制,那么应用程序可能存在 数据竞争(也叫做 竞争条件 )。

死锁

当两个(或多个)任务正在等待必须由另一线程释放的某个共享资源,而该线程又正在等待必须由前述任务之一释放的另一共享资惊时,并发应用程序就出现了死锁。当系统中同时出现如下四种条件时,就会导致这种情形。我们将其称为 Coffman 条件:

  • 互斥: 死锁中涉及的资师、必须是不可共享的。一次只有一个任务可以使用该资源。
  • 占有并等待条件: 一个任务在占有某一互斥的资源时又请求另一互斥的资源。当它在等待时,不会释放任何资源。
  • 不可剥夺:资源只能被那些持有它们的任务释放。
  • 循环等待:任务 1 正等待任务 2 所占有的资源, 而任务 2 又正在等待任务 3 所占有的资源,以此类推,最终任务 n 又在等待由任务 1 所占有的资源,这样就出现了循环等待。

有一些机制可以用来避免死锁:

  • 忽略它们:这是最常用的机制。你可以假设自己的系统绝不会出现死锁,而如果发生死锁,结果就是你可以停止应用程序并且重新执行它。
  • 检测:系统中有一项专门分析系统状态的任务,可以检测是否发生了死锁。如果它检测到了死锁,可以采取一些措施来修复该问题,例如,结束某个任务或者强制释放某一资源。
  • 预防:如果你想防止系统出现死锁,就必须预防 Coffman 条件中的一条或多条出现。
  • 规避:如果你可以在某一任务执行之前得到该任务所使用资源的相关信息,那么死锁是可以规避的。当一个任务要开始执行时,你可以对系统中空闲的资源和任务所需的资源进行分析,这样就可以判断任务是否能够开始执行。

活锁

如果系统中有两个任务,它们总是因对方的行为而改变自己的状态, 那么就出现了活锁。最终结果是它们陷入了状态变更的循环而无法继续向下执行。

例如,有两个任务:任务 1 和任务 2 ,它们都需要用到两个资源:资源 1 和资源 2 。假设任务 1 对资源 1 加了一个锁,而任务 2 对资源 2 加了一个锁。当它们无法访问所需的资源时,就会释放自己的资源并且重新开始循环。这种情况可以无限地持续下去,所以这两个任务都不会结束自己的执行过程。

资源不足

当某个任务在系统中无法获取维持其继续执行所需的资源时,就会出现资源不足。当有多个任务在等待某一资源且该资源被释放时,系统需要选择下一个可以使用该资源的任务。如果你的系统中没有设计良好的算法,那么系统中有些线程很可能要为获取该资源而等待很长时间。

要解决这一问题就要确保公平原则。所有等待某一资源的任务必须在某一给定时间之内占有该资源。可选方案之一就是实现一个算法,在选择下一个将占有某一资源的任务时,对任务已等待该资源的时间因素加以考虑。然而,实现锁的公平需要增加额外的开销,这可能会降低程序的吞吐量

优先权反转

当一个低优先权的任务持有了一个高优先级任务所需的资源时,就会发生优先权反转。这样的话,低优先权的任务就会在高优先权的任务之前执行。

JMM,Java 内存模型

JMM 与 happen-before

为什么会存在“内存可见性”问题

x86 架构下 CPU 缓存的布局,即在一个 CPU 4 核下, L1 、 L2 、 L3 三级缓存与主内存的布局。每个核上面有 L1 、 L2 缓存, L3 缓存为所有核共用

因为存在 CPU 缓存一致性协议,例如 MESI ,多个 CPU 核心之间缓存不会出现不同步的问题,不会有“内存可见性”问题。缓存一致性协议对性能有很大损耗,为了解决这个问题,又进行了各种优化。例如,在计算单元和 L1 之间加了 Store Buffer 、 Load Buffer (还有其他各种 Buffer )

L1 、 L2 、 L3 和主内存之间是同步的,有缓存一致性协议的保证,但是 Store Buffer 、 Load Buffer 和 L1 之间却是异步的。向内存中写入一个变量,这个变量会保存在 Store Buffer 里面,稍后才异步地写入 L1 中,同时同步写入主内存中。

操作系统内核视角下的 CPU 缓存模型:

20210614. 并发编程 - 拉勾教育

多 CPU ,每个 CPU 多核,每个核上面可能还有多个硬件线程,对于操作系统来讲,就相当于一个个的逻辑 CPU 。每个逻辑 CPU 都有自己的缓存,这些缓存和主内存之间不是完全同步的。

对应到 Java 里,就是 JVM 抽象内存模型,如下图所示:

20210614. 并发编程 - 拉勾教育

重排序与内存可见性的关系

Store Buffer 的延迟写入是重排序的一种,称为内存重排序( Memory Ordering )。除此之外,还有编译器和 CPU 的指令重排序。

重排序类型:

  • 编译器重排序:对于没有先后依赖关系的语句,编译器可以重新调整语句的执行顺序。
  • CPU 指令重排序:在指令级别,让没有依赖关系的多条指令并行。
  • CPU 内存重排序:CPU 有自己的缓存,指令的执行顺序和写入主内存的顺序不完全一致。

在三种重排序中,第三类就是造成“内存可见性”问题的主因,如下案例:

线程1:
X=1
a=Y
线程2:
Y=1
b=X

假设X、Y是两个全局变量,初始的时候,X=0,Y=0。请问,这两个线程执行完毕之后,a、b的正
确结果应该是什么?

很显然,线程 1 和线程 2 的执行先后顺序是不确定的,可能顺序执行,也可能交叉执行,最终正确的结果可能是:

1. a=0,b=1
2. a=1,b=0
3. a=1,b=1

也就是不管谁先谁后,执行结果应该是这三种场景中的一种。但实际可能是 a=0,b=0。

两个线程的指令都没有重排序,执行顺序就是代码的顺序,但仍然可能出现 a = 0 , b = 0 。原因是线程 1 先执行 X = 1 ,后执行 a = Y ,但此时 X = 1 还在自己的 Store Buffer 里面,没有及时写入主内存中。所以,线程 2 看到的 X 还是 0 。线程 2 的道理与此相同。

虽然线程 1 觉得自己是按代码顺序正常执行的,但在线程 2 看来, a = Y 和 X = 1 顺序却是颠倒的。指令没有重排序,是写入内存的操作被延迟了,也就是内存被重排序了,这就造成内存可见性问题。

内存屏障

为了禁止编译器重排序和 CPU 重排序,在编译器和 CPU 层面都有对应的指令,也就是内存屏障( Memory Barrier )。这也正是 JMM 和 happen-before 规则的底层实现原理。

编译器的内存屏障,只是为了告诉编译器不要对指令进行重排序。当编译完成之后,这种内存屏障就消失了, CPU 并不会感知到编译器中内存屏障的存在。而 CPU 的内存屏障是 CPU 提供的指令,可以由开发者显式调用。

内存屏障是很底层的概念,对于 Java 开发者来说,一般用 volatile 关键字就足够了。但从 JDK 8 开始, Java 在 Unsafe 类中提供了三个内存屏障函数,如下所示:

public final class Unsafe {
    // ...
    public native void loadFence();
    public native void storeFence();
    public native void fullFence();
    // ...
}

在理论层面,可以把基本的 CPU 内存屏障分成四种:

  • LoadLoad :禁止读和读的重排序。
  • StoreStore :禁止写和写的重排序。
  • LoadStore :禁止读和写的重排序。
  • StoreLoad :禁止写和读的重排序。

Unsafe 中的方法:

  • loadFence=LoadLoad+LoadStore
  • storeFence=StoreStore+LoadStore
  • fullFence=loadFence+storeFence+StoreLoad
as-if-serial 语义

重排序的原则是什么?什么场景下可以重排序,什么场景下不能重排序呢?

单线程程序的重排序规则

无论什么语言,站在编译器和CPU的角度来说,不管怎么重排序,单线程程序的执行结果不能改变,这就是单线程程序的重排序规则。

即只要操作之间没有数据依赖性,编译器和 CPU 都可以任意重排序,因为执行结果不会改变,代码看起来就像是完全串行地一行行从头执行到尾,这也就是 as-if-serial 语义。

对于单线程程序来说,编译器和 CPU 可能做了重排序,但开发者感知不到,也不存在内存可见性问题

多线程程序的重排序规则

编译器和 CPU 的这一行为对于单线程程序没有影响,但对多线程程序却有影响。

对于多线程程序来说,线程之间的数据依赖性太复杂,编译器和 CPU 没有办法完全理解这种依赖性并据此做出最合理的优化。

编译器和 CPU 只能保证每个线程的 as-if-serial 语义。

线程之间的数据依赖和相互影响,需要编译器和 CPU 的上层来确定。

上层要告知编译器和 CPU 在多线程场景下什么时候可以重排序,什么时候不能重排序

happen-before 是什么

使用 happen-before 描述两个操作之间的内存可见性。

Java 内存模型( JMM )是一套规范,在多线程中,一方面,要让编译器和 CPU 可以灵活地重排序;另一方面,要对开发者做一些承诺,明确告知开发者不需要感知什么样的重排序,需要感知什么样的重排序。然后,根据需要决定这种重排序对程序是否有影响。如果有影响,就需要开发者显示地通过 volatilesynchronized 等线程同步机制来禁止重排序。

关于 happen-before : 如果 A happen-before B ,意味着 A 的执行结果必须对 B 可见,也就是保证跨线程的内存可见性。 A happen before B 不代表 A 一定在 B 之前执行。因为,对于多线程程序而言,两个操作的执行顺序是不确定的。 happen-before 只确保如果 A 在 B 之前执行,则 A 的执行结果必须对 B 可见。定义了内存可见性的约束,也就定义了一系列重排序的约束。

基于 happen-before 的这种描述方法,JMM 对开发者做出了一系列承诺:

  • 单线程中的每个操作,happen-before 对应该线程中任意后续操作(也就是 as-if-serial 语义保证)。
  • volatile 变量的写入,happen-before 对应后续对这个变量的读取。
  • synchronized 的解锁,happen-before 对应后续对这个锁的加锁。

JMM 对编译器和 CPU 来说,volatile 变量不能重排序;非 volatile 变量可以任意重排序。

happen-before 的传递性

除了这些基本的 happen-before 规则, happen-before 还具有传递性,即若 A happen-before B , B happen-before C ,则 A happen-before C 。

如果一个变量不是 volatile 变量,当一个线程读取、一个线程写入时可能有问题。那岂不是说,在多线程程序中,我们要么加锁,要么必须把所有变量都声明为 volatile 变量?这显然不可能,而这就得归功于 happen-before 的传递性。

volatile 关键字

64 位写入的原子性(Half Write)

对于一个 long 型变量的赋值和取值操作而言,在多线程场景下,线程 A 调用 set(100) ,线程 B 调用 get() ,在某些场景下,返回值可能不是 100 。

因为 JVM 的规范并没有要求 64 位的 long 或者 double 的写入是原子的。在 32 位的机器上,一个 64 位变量的写入可能被拆分成两个 32 位的写操作来执行。这样一来,读取的线程就可能读到“一半的值”。解决办法也很简单,在 long 前面加上 volatile 关键字。

重排序:DCL 问题

单例模式的线程安全的写法不止一种,常用写法为 DCL(Double Checking Locking)

public class Singleton {
    private static Singleton instance;
    
    public static Singleton getInstance() {
        if (instance == null) {
            synchronized(Singleton.class) {
                if (instance == null) {
                    // 此处代码有问题
                    instance = new Singleton();
                }
            }
        } 
        return instance;
    }
}

上述的 instance = new Singleton(); 代码有问题:其底层会分为三个操作:

  1. 分配一块内存。
  2. 在内存上初始化成员变量。
  3. instance 引用指向内存。

在这三个操作中,操作 2 和操作 3 可能重排序,即先把 instance 指向内存,再初始化成员变量,因为二者并没有先后的依赖关系。此时,另外一个线程可能拿到一个未完全初始化的对象。这时,直接访问里面的成员变量,就可能出错。这就是典型的“构造方法溢出”问题。

解决办法也很简单,就是为 instance 变量加上 volatile 修饰。

volatile 的三重功效: 64 位写入的原子性、内存可见性和禁止重排序。

volatile 实现原理

由于不同的 CPU 架构的缓存体系不一样,重排序的策略不一样,所提供的内存屏障指令也就有差异。

这里只探讨为了实现 volatile 关键字的语义的一种参考做法:

  1. volatile 写操作的前面插入一个 StoreStore 屏障。保证 volatile 写操作不会和之前的写操作重排序。
  2. volatile 写操作的后面插入一个 StoreLoad 屏障。保证 volatile 写操作不会和之后的读操作重排序。
  3. volatile 读操作的后面插入一个 LoadLoad 屏障 + LoadStore 屏障。保证 volatile 读操作不会和之后的读操作、写操作重排序。

具体到 x86 平台上,其实不会有 LoadLoadLoadStoreStoreStore 重排序,只有 StoreLoad 一种重排序(内存屏障),也就是只需要在 volatile 写操作后面加上 StoreLoad 屏障。

JSR-133 对 volatile 语义的增强

在 JSR-133 之前的旧内存模型中,一个 64 位 long/double 型变量的读/写操作可以被拆分为两个 32 位的读 / 写操作来执行。从 JSR-133 内存模型开始 (即从 JDK 5 开始),仅仅只允许把一个 64 位 long/ double 型变量的写操作拆分为两个 32 位的写操作来执行,任意的读操作在 JSR-133 中都必须具有原子性(即 任意读操作必须要在单个读事务中执行)。

这也正体现了 Java 对 happen-before 规则的严格遵守。

final 关键字

构造方法溢出问题
/**
 * 构造方法溢出问题
 */
public class MyClass {
    private int num1;
    private int num2;
    private static MyClass myClass;

    public MyClass() {
        num1 = 1;
        num2 = 2;
    }

    /**
     * 线程A先执行write()
     */
    public static void write() {
        myClass = new MyClass();
    }

    /**
     * 线程B接着执行write()
     */
    public static void read() {
        if (myClass != null) {
            int num3 = myClass.num1;
            int num4 = myClass.num2;
        }
    }
}

num3 和 num4 的值是否一定是 1 和 2 ? num3 、 num4 不见得一定等于 1 , 2 。和 DCL 的例子类似,也就是构造方法溢出问题

myClass = new MyClass() 这行代码,分解成三个操作:

  1. 分配一块内存;
  2. 在内存上初始化 i=1,j=2;
  3. 把 myClass 指向这块内存。

操作 2 和操作 3 可能重排序,因此线程 B 可能看到未正确初始化的值。对于构造方法溢出,就是一个对象的构造并不是“原子的”,当一个线程正在构造对象时,另外一个线程却可以读到未构造好的“一半对象”。

final 的 happen-before 语义

要解决这个问题,不止有一种办法:

  • 办法 1 :给 num1 , num2 加上 volatile 关键字。
  • 办法 2 :为 read/write 方法都加上 synchronized 关键字。
  • 如果 num1 , num2 只需要初始化一次,还可以使用 final 关键字。

之所以能解决问题,是因为同 volatile 一样, final 关键字也有相应的 happen-before 语义:

  1. final 域的写(构造方法内部), happen-before 于后续对 final 域所在对象的读。
  2. final 域所在对象的读, happen-before 于后续对 final 域的读。

通过这种 happen-before 语义的限定,保证了 final 域的赋值,一定在构造方法之前完成,不会出现另外一个线程读取到了对象,但对象里面的变量却还没有初始化的情形,避免出现构造方法溢出的问题。

happen-before 规则总结

  1. 单线程中的每个操作,happen-before 对应该线程中任意后续操作(也就是 as-if-serial 语义保证)。
  2. volatile 变量的写,happen-before 于后续对这个变量的读。
  3. synchronized 的解锁,happen-before 于后续对这个锁的加锁。
  4. final 变量的写,happen-before 于 final 域对象的读,happen-before 于后续对 final 变量的读。

四个基本规则再加上 happen-before 的传递性,就构成 JMM 对开发者的整个承诺。在这个承诺以外的部分,程序都可能被重排序,都需要开发者小心地处理内存可见性问题。

20210614. 并发编程 - 拉勾教育

JUC

并发容器

BlockingQueue

在所有的并发容器中, BlockingQueue 是最常见的一种。 BlockingQueue 是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者。

java.util.concurrent 包中, BlockingQueue 是一个接口,有许多个不同的实现类

该接口和 JDK 集合包中的 Queue 接口是兼容的,同时在其基础上增加了阻塞功能。

ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的环形队列,在构造方法中,会要求传入数组的容量

LinkedBlockingQueue

LinkedBlockingQueue 是一种基于单向链表的阻塞队列。因为队头和队尾是 2 个指针分开操作的,所以用了 2 把锁 +2 个条件,同时有 1 个 AtomicInteger 的原子变量记录 count 数。

在其构造方法中,也可以指定队列的总容量。如果不指定,默认为 Integer.MAX_VALUE

LinkedBlockingQueueArrayBlockingQueue 的差异:

  • 为了提高并发度,用 2 把锁,分别控制队头、队尾的操作。意味着在 putput 之间、 taketake 之间是互斥的, puttake 之间并不互斥。但对于 count 变量,双方都需要操作,所以必须是原子类型。
  • 因为各自拿了一把锁,所以当需要调用对方的 conditionsignal 时,还必须再加上对方的锁,就是 signalNotEmpty()signalNotFull() 方法。
  • 不仅 put 会通知 taketake 也会通知 put 。当 put 发现非满的时候,也会通知其他 put 线程;当 take 发现非空的时候,也会通知其他 take 线程。
PriorityBlockingQueue

队列通常是先进先出的,而 PriorityBlockingQueue 是按照元素的优先级从小到大出队列的。正因为如此, PriorityBlockingQueue 中的 2 个元素之间需要可以比较大小,并实现 Comparable 接口。

在阻塞的实现方面,和 ArrayBlockingQueue 的机制相似,主要区别是用数组实现了一个二叉堆,从而实现按优先级从小到大出队列。另一个区别是没有 notFull 条件,当元素个数超出数组长度时,执行扩容操作。

DelayQueue

DelayQueue 即延迟队列,也就是一个按延迟时间从小到大出队的 PriorityQueue 。所谓延迟时间,就是“未来将要执行的时间”减去“当前时间”。为此,放入 DelayQueue 中的元素,必须实现 Delayed 接口。

关于 java.util.concurrent.Delayed 接口:

  • 如果 getDelay 的返回值小于或等于 0 ,则说明该元素到期,需要从队列中拿出来执行。
  • 该接口首先继承了 Comparable 接口,所以要实现该接口,必须实现 Comparable 接口。具体来说,就是基于 getDelay() 的返回值比较两个元素的大小。

关于 take() 方法:不同于一般的阻塞队列,只在队列为空的时候,才阻塞。如果堆顶元素的延迟时间没到,也会阻塞。

关于 put() 方法:不是每放入一个元素,都需要通知等待的线程。放入的元素,如果其延迟时间大于当前堆顶
的元素延迟时间,就没必要通知等待的线程;只有当延迟时间是最小的,在堆顶时,才有必要通知等待的线程

SynchronousQueue

SynchronousQueue 是一种特殊的 BlockingQueue ,它本身没有容量。先调 put ,线程会阻塞;直到另外一个线程调用了 take ,两个线程才同时解锁,反之亦然。对于多个线程而言,例如 3 个线程,调用 3 次 put , 3 个线程都会阻塞;直到另外的线程调用 3 次 take , 6 个线程才同时解锁,反之亦然。

BlockingDeque

BlockingDeque 定义了一个阻塞的双端队列接口

该接口继承了 BlockingQueue 接口,同时增加了对应的双端队列操作接口。该接口只有一个实现,就是 LinkedBlockingDeque

实现原理,和 LinkedBlockingQueue 基本一样,只是 LinkedBlockingQueue 是单向链表,而 LinkedBlockingDeque 是双向链表

CopyOnWrite

CopyOnWrite 指在“写”的时候,不是直接“写”源数据,而是把数据拷贝一份进行修改,再通过悲观锁或者乐观锁的方式写回。那为什么不直接修改,而是要拷贝一份修改呢?这是为了在“读”的时候不加锁。

CopyOnWriteArrayList

ArrayList 一样, CopyOnWriteArrayList 的核心数据结构也是一个数组

CopyOnWriteArraySet

CopyOnWriteArraySet 就是用 Array 实现的一个 Set ,保证所有元素都不重复。其内部是封装的一个 CopyOnWriteArrayList

ConcurrentLinkedQueue/Deque

AQS 内部的阻塞队列实现原理:基于双向链表,通过对 head/tail 进行 CAS 操作,实现入队和出队。

ConcurrentLinkedQueue 的实现原理和 AQS 内部的阻塞队列类似:同样是基于 CAS ,同样是通过 head/tail 指针记录队列头部和尾部,但还是有稍许差别。

ConcurrentHashMap

HashMap 通常的实现方式是“数组 + 链表”,这种方式被称为“拉链法”。 ConcurrentHashMap 在这个基本原理之上进行了各种优化。首先是所有数据都放在一个大的 HashMap 中;其次是引入了红黑树。

20210614. 并发编程 - 拉勾教育

如果头节点是 Node 类型,则尾随它的就是一个普通的链表;如果头节点是 TreeNode 类型,它的后面就是一颗红黑树, TreeNodeNode 的子类。

链表和红黑树之间可以相互转换:初始的时候是链表,当链表中的元素超过某个阈值时,把链表转换成红黑树;反之,当红黑树中的元素个数小于某个阈值时,再转换为链表。

那为什么要做这种设计呢?

  • 使用红黑树,当一个槽里有很多元素时,其查询和更新速度会比链表快很多, Hash 冲突的问题由此得到较好的解决。
  • 加锁的粒度,并非整个 ConcurrentHashMap ,而是对每个头节点分别加锁,即并发度,就是 Node 数组的长度,初始长度为 16 。
  • 并发扩容,这是难度最大的。当一个线程要扩容 Node 数组的时候,其他线程还要读写,因此处理过程很复杂。

由上述对比可以总结出来:这种设计一方面降低了 Hash 冲突,另一方面也提升了并发度。

ConcurrentSkipListMap/Set

ConcurrentHashMap 是一种 key 无序的 HashMapConcurrentSkipListMap 则是 key 有序的,实现了 NavigableMap 接口,此接口又继承了 SortedMap 接口。

ConcurrentSkipListMap
为什么要使用 SkipList 实现 Map

在 Java 的 util 包中,有一个非线程安全的 key 有序的 HashMap ,也就是 TreeMap,基于红黑树实现。而在 Concurrent 包中,提供的 key 有序的 HashMap ,也就是 ConcurrentSkipListMap ,是基于 SkipList (跳查表)来实现的。这里为什么不用红黑树,而用跳查表来实现呢?目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法

那为什么 SkipList 可以无锁地实现节点的增加、删除呢?这要从无锁链表的实现说起

无锁链表

在前面讲解 AQS 时,曾反复用到无锁队列,其实现也是链表。究竟二者的区别在哪呢?前面讲的无锁队列、栈,都是只在队头、队尾进行 CAS 操作,通常不会有问题。如果在链表的中间进行插入或删除操作,按照通常的 CAS 做法,就会出现问题!

跳查表

跳查表就是多层链表叠起来的

ConcurrentSkipListSet

ConcurrentSkipListSet 只是对 ConcurrentSkipListMap 的简单封装

同步工具类

Semaphore

Semaphore 也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单

// 一开始有5份共享资源。第二个参数表示是否是公平
Semaphore myResources = new Semaphore(5, true);
// 工作线程每获取一份资源,就在该对象上记下来
// 在获取的时候是按照公平的方式还是非公平的方式,就要看上一行代码的第二个参数了。
// 一般非公平抢占效率较高。
myResources.acquire();
// 工作线程每归还一份资源,就在该对象上记下来
// 此时资源可以被其他线程使用
myResources.release();
/*
释放指定数目的许可,并将它们归还给信标。
可用许可数加上该指定数目。
如果线程需要获取N个许可,在有N个许可可用之前,该线程阻塞。
如果线程获取了N个许可,还有可用的许可,则依次将这些许可赋予等待获取许可的其他线程。
*/
semaphore.release(2);
/*
从信标获取指定数目的许可。如果可用许可数目不够,则线程阻塞,直到被中断。
该方法效果与循环相同,
for (int i = 0; i < permits; i++) acquire();
只不过该方法是原子操作。
如果可用许可数不够,则当前线程阻塞,直到:(二选一)
1. 如果其他线程释放了许可,并且可用的许可数满足当前线程的请求数字;
2. 其他线程中断了当前线程。
permits – 要获取的许可数
*/
semaphore.acquire(3);

案例:
大学生到自习室抢座,写作业:

public class Main {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);

        for (int i = 0; i < 10; i++) {
            new MyThread("学生-" + (i + 1), semaphore).start();
        }

    }
}
public class MyThread extends Thread {
    private final Semaphore semaphore;
    private final Random random = new Random();

    public MyThread(String name, Semaphore semaphore) {
        super(name);
        this.semaphore = semaphore;
    }


    @Override
    public void run() {

        try {
            // 获取信标:抢座
            semaphore.acquire();
            // 抢到之后开始写作业
            System.out.println(Thread.currentThread().getName() + " - 抢到了座位,开始写作业");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " - 作业写完,腾出座位");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 释放信标:腾出座位
        semaphore.release();
    }
}

CountDownLatch

CountDownLatch 使用场景

假设一个主线程要等待 5 个 Worker 线程执行完才能退出,可以使用 CountDownLatch 来实现:

public class MyThread extends Thread {
    private final CountDownLatch latch;
    private final Random random = new Random();

    public MyThread(String name, CountDownLatch latch) {
        super(name);
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(random.nextInt(2000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " - 执行完毕");
        // latch计数减一
        latch.countDown();
    }
}
public class Main {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        for (int i = 0; i < 4; i++) {
            new MyThread("线程" + (i + 1), latch).start();
        }

        // main线程等待
        latch.await();
        System.out.println("main线程执行结束");

    }
}

CyclicBarrier

该类用于协调多个线程同步执行操作的场合。

使用场景: 10 个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。

public class MyThread extends Thread {

    private final CyclicBarrier barrier;
    private final Random random = new Random();

    public MyThread(String name, CyclicBarrier barrier) {
        super(name);
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " - 向公司出发");
            Thread.sleep(random.nextInt(5000));
            System.out.println(Thread.currentThread().getName() + " - 已经到达公司");
            // 等待其他线程该阶段结束
            barrier.await();

            System.out.println(Thread.currentThread().getName() + " - 开始笔试");
            Thread.sleep(random.nextInt(5000));
            System.out.println(Thread.currentThread().getName() + " - 笔试结束");
            // 等待其他线程该阶段结束
            barrier.await();

            System.out.println(Thread.currentThread().getName() + " - 开始面试");
            Thread.sleep(random.nextInt(5000));
            System.out.println(Thread.currentThread().getName() + " - 面试结束");

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
public class Main {
    public static void main(String[] args) {
        //        CyclicBarrier barrier = new CyclicBarrier(5);
        CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("该阶段结束");
            }
        });

        for (int i = 0; i < 5; i++) {
            new MyThread("线程-" + (i + 1), barrier).start();
        }

    }
}

Exchanger

Exchanger 用于线程之间交换数据

public class Main {
    private static final Random random = new Random();

    public static void main(String[] args) {
        // 建一个多线程共用的exchange对象
        // 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自己的数据作为参数
        // 传递进去,返回值是另外一个线程调用exchange传进去的参数
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread("线程1") {
            @Override
            public void run() {
                while (true) {
                    try {
                        // 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调用exchange为止。
                        String otherData = exchanger.exchange("交换数据1");
                        System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        new Thread("线程2") {
            @Override
            public void run() {
                while (true) {
                    try {
                        String otherData = exchanger.exchange("交换数据2");
                        System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        new Thread("线程3") {
            @Override
            public void run() {
                while (true) {
                    try {
                        String otherData = exchanger.exchange("交换数据3");
                        System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }
}

在上面的例子中,3 个线程并发地调用 exchange(...) ,会两两交互数据,如 1/2、1/3 和 2/3 。

Phaser

从 JDK7 开始,新增了一个同步工具类 Phaser ,其功能比 CyclicBarrierCountDownLatch 更加强大。

Phaser 替代 CountDownLatch :考虑讲 CountDownLatch 时的例子, 1 个主线程要等 10 个 Worker 线程完成之后,才能做接下来的事情,也可以用 Phaser 来实现此功能。在 CountDownLatch 中,主要是 2 个方法: await()countDown() ,在 Phaser 中,与之相对应的方法是 awaitAdance(int n)arrive()

Phaser 替代 CyclicBarrierPhaserarriveAndAwaitAdvance() 对应 CyclicBarrierawait()

Phaser 新特性
动态调整线程个数

CyclicBarrier 所要同步的线程个数是在构造方法中指定的,之后不能更改,而 Phaser 可以在运行期间动态地调整要同步的线程个数。 Phaser 提供了下面这些方法来增加、减少所要同步的线程个数。

register() // 注册一个
bulkRegister(int parties) // 注册多个
arriveAndDeregister() // 解除注册
层次 Phaser

多个 Phaser 可以组成树状结构,可以通过在构造方法中传入父 Phaser 来实现。

Phaser 的内部结构中,每个 Phaser 记录了自己的父节点,但并没有记录自己的子节点列表。所以,每个 Phaser 知道自己的父节点是谁,但父节点并不知道自己有多少个子节点,对父节点的操作,是通过子节点来实现的。

Phaser 并不用感知子 Phaser 的存在,当子 Phaser 中注册的参与者数量大于 0 时,会把自己向父节点注册;当子 Phaser 中注册的参与者数量等于 0 时,会自动向父节点解除注册。父 Phaser 把子 Phaser 当作一个正常参与的线程就即可。

Atomic 类

AtomicInteger 和 AtomicLong

悲观锁与乐观锁

对于悲观锁,认为数据发生并发冲突的概率很大,读操作之前就上锁。 synchronized 关键字,后面要讲的 ReentrantLock 都是悲观锁的典型。

对于乐观锁,认为数据发生并发冲突的概率比较小,读操作之前不上锁。等到写操作的时候,再判断数据在此期间是否被其他线程修改了。如果被其他线程修改了,就把数据重新读出来,重复该过程;如果没有被修改,就写回去。判断数据是否被修改,同时写回新值,这两个操作要合成一个原子操作,也就是 CAS ( Compare And Set )。

AtomicInteger 的实现就是典型的乐观锁。

Unsafe 的 CAS 详解

Unsafe 类是整个 Concurrent 包的基础

自旋与阻塞

当一个线程拿不到锁的时候,有以下两种基本的等待策略:

  • 策略1:放弃 CPU ,进入阻塞状态,等待后续被唤醒,再重新被操作系统调度。
  • 策略2:不放弃 CPU ,空转,不断重试,也就是所谓的 自旋

很显然,如果是单核的CPU,只能用策略1。因为如果不放弃CPU,那么其他线程无法运行,也就无法释放锁。但对于多CPU或者多核,策略2就很有用了,因为没有线程切换的开销。

AtomicInteger 的实现就用的是“自旋”策略,如果拿不到锁,就会一直重试。

注意:以上两种策略并不互斥,可以结合使用。如果获取不到锁,先自旋;如果自旋还拿不到锁,再阻塞,synchronized 关键字就是这样的实现策略。

除了 AtomicIntegerAtomicLong 也是同样的原理。

AtomicBoolean 和 AtomicReference

为什么需要 AtomicBoolean

对于 int 或者 long 型变量,需要进行加减操作,所以要加锁;但对于一个 boolean 类型来说, truefalse 的赋值和取值操作,加上 volatile 关键字就够了,为什么还需要 AtomicBoolean 呢?

这是因为往往要实现下面这种功能:

if (!flag) {
    flag = true;
    // ...
} 

// 或者更清晰一点的:
if (flag == false) {
    flag = true;
    // ...
}

也就是要实现 compare 和 set 两个操作合在一起的原子性,而这也正是 CAS 提供的功能。上面的代码,就变成:

if (compareAndSet(false, true)) {
    // ...
}

同样地,AtomicReference 也需要同样的功能

如何支持 boolean 和 double 类型

Unsafe 类中,只提供了三种类型的 CAS 操作: intlongObject (也就是引用类型)。

在 jdk 的实现中,这三种 CAS 操作都是由底层实现的,其他类型的 CAS 操作都要转换为这三种之一进行操作。

AtomicBoolean 类型如何支持?对于用 int 型来代替的,在入参的时候,将 boolean 类型转换成 int 类型;在返回值的时候,将 int 类型转换成 boolean 类型。

如果是 double 类型,又如何支持呢?这依赖 double 类型提供的一对 double 类型和 long 类型互转的方法:

public static native double longBitsToDouble(long bits);
public static native long doubleToRawLongBits(double value);

AtomicStampedReference 和 AtomicMarkableReference

到目前为止, CAS 都是基于“值”来做比较的。但如果另外一个线程把变量的值从 A 改为 B ,再从 B 改回到 A ,那么尽管修改过两次,可是在当前线程做 CAS 操作的时候,却会因为值没变而认为数据没有被其他线程修改过,这就是所谓的 ABA 问题

要解决 ABA 问题,不仅要比较“值”,还要比较“版本号”,而这正是 AtomicStampedReference 做的事情

为什么没有 AtomicStampedInteger 或 AtomictStampedLong

要解决 Integer 或者 Long 型变量的 ABA 问题,为什么只有 AtomicStampedReference ,而没有 AtomicStampedInteger 或者 AtomictStampedLong 呢?

因为这里要同时比较数据的“值”和“版本号”,而 Integer 型或者 Long 型的 CAS 没有办法同时比较两个变量。

于是只能把值和版本号封装成一个对象,也就是这里面的 Pair 内部类,然后通过对象引用的 CAS 来实现。

AtomicMarkableReference

AtomicMarkableReferenceAtomicStampedReference 原理类似,只是 Pair 里面的版本号是 boolean 类型的,而不是整型的累加变量

因为是 boolean 类型,只能有 truefalse 两个版本号,所以并不能完全避免 ABA 问题,只是降低了 ABA 发生的概率。

AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater

为什么需要 AtomicXXXFieldUpdater

如果一个类是自己编写的,则可以在编写的时候把成员变量定义为 Atomic 类型。但如果是一个已经有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要 AtomicIntegerFieldUpdaterAtomicLongFieldUpdaterAtomicReferenceFieldUpdater

AtomicIntegerFieldUpdater 是一个抽象类。 方法 newUpdater 用于创建 AtomicIntegerFieldUpdater 类对象

public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName) {
    return new AtomicIntegerFieldUpdaterImpl<U>
        (tclass, fieldName, Reflection.getCallerClass());
}

newUpdater 静态方法传入的是要修改的类(不是对象)和对应的成员变量的名字,内部通过反射拿到这个类的成员变量,然后包装成一个 AtomicIntegerFieldUpdater 对象。所以,这个对象表示的是类的某个成员,而不是对象的成员变量。

若要修改某个对象的成员变量的值,再传入相应的对象

public int getAndIncrement(T obj)
public int getAndSet(T obj, int newValue)
    ......

要想使用 AtomicIntegerFieldUpdater 修改成员变量,成员变量必须是 volatileint 类型(不能是 Integer 包装类)

至于 AtomicLongFieldUpdaterAtomicReferenceFieldUpdater ,也有类似的限制条件。其底层的 CAS 原理,也和 AtomicLongAtomicReference 一样。

AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray

Concurrent 包提供了 AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray 三个数组元素的原子操作。注意,这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作而言。

// 这里入参 i 是数组下标
public final int getAndSet(int i, int newValue)

另外两个数组的原子类实现原理与之类似

Striped64 与 LongAdder

从 JDK 8 开始,针对 Long 型的原子操作, Java 又提供了 LongAdderLongAccumulator ;针对 Double 类型, Java 提供了 DoubleAdderDoubleAccumulator 。它们都继承自 Striped64 。 Striped 意为“条带”,也就是分片。

LongAdder 原理

把一个 Long 型拆成一个 base 变量外加多个 Cell ,每个 Cell 包装了一个 Long 型变量。当多个线程并发累加的时候,如果并发度低,就直接加到 base 变量上;如果并发度高,冲突大,平摊到这些 Cell 上。在最后取值的时候,再把 base 和这些 Cellsum 运算。

最终一致性

sum 求和方法中,并没有对 cells[] 数组加锁。也就是说,一边有线程对其执行求和操作,一边还有线程修改数组里的值,也就是最终一致性,而不是强一致性。这也类似于 ConcurrentHashMap 中的 clear() 方法,一边执行清空操作,一边还有线程放入数据, clear() 方法调用完毕后再读取, hash map 里面可能还有元素。因此,在 LongAdder 适合高并发的统计场景,而不适合要对某个 Long 型变量进行严格同步的场景。

伪共享与缓存行填充

Cell 类的定义中,用了一个独特的注解 @sun.misc.Contended ,这是 JDK 8 之后才有的,背后涉及一个很重要的优化原理:伪共享与缓存行填充。

声明一个 @sun.misc.Contended 即可实现缓存行的填充。之所以这个地方要用缓存行填充,是为了不让 Cell [] 数组中相邻的元素落到同一个缓存行里。

LongAccumulator

LongAccumulator 的原理和 LongAdder 类似,只是功能更强大

LongAdder 只能进行累加操作,并且初始值默认为 0LongAccumulator 可以自己定义一个二元操作符,并且可以传入一个初始值。

DoubleAdder 与 DoubleAccumulator

DoubleAdder 其实也是用 long 型实现的,因为没有 double 类型的 CAS 方法。

DoubleAccumulatorDoubleAdder 的关系,与 LongAccumulatorLongAdder 的关系类似,只是多了一个二元操作符。

Lock 与 Condition

互斥锁

锁的可重入性

“可重入锁”是指当一个线程调用 object.lock() 获取到锁,进入临界区后,再次调用 object.lock() ,仍然可以获取到该锁。显然,通常的锁都要设计成可重入的,否则就会发生死锁。 synchronized 关键字,就是可重入锁。

java.util.concurrent.locks.Lock 接口

ReentrantLock 本身没有代码逻辑,实现都在其内部类 Sync 中,Sync 实现了 Lock 接口

锁的公平性 vs 非公平性

Sync 是一个抽象类,它有两个子类 FairSyncNonfairSync ,分别对应公平锁和非公平锁,默
认为非公平的。

一个新的线程来了之后,看到有很多线程在排队,自己排到队伍末尾,这叫公平;线程来了之后直接去抢锁,这叫作不公平。默认设置的是非公平锁,其实是为了提高效率,减少线程切换。

让我们考虑一种情况,假设线程 A 持有一把锁,线程 B 请求这把锁,由于线程 A 已经持有这把锁了,所以线程 B 会陷入等待,在等待的时候线程 B 会被挂起,也就是进入阻塞状态,那么当线程 A 释放锁的时候,本该轮到线程 B 苏醒获取锁,但如果此时突然有一个线程 C 插队请求这把锁,那么根据非公平的策略,会把这把锁给线程 C,这是因为唤醒线程 B 是需要很大开销的,很有可能在唤醒之前,线程 C 已经拿到了这把锁并且执行完任务释放了这把锁。相比于等待唤醒线程 B 的漫长过程,插队的行为会让线程 C 本身跳过陷入阻塞的过程,如果在锁代码中执行的内容不多的话,线程 C 就可以很快完成任务,并且在线程 B 被完全唤醒之前,就把这个锁交出去,这样是一个双赢的局面,对于线程 C 而言,不需要等待提高了它的效率,而对于线程 B 而言,它获得锁的时间并没有推迟,因为等它被唤醒的时候,线程 C 早就释放锁了,因为线程 C 的执行速度相比于线程 B 的唤醒速度,是很快的,所以 Java 设计非公平锁,是为了提高整体的运行效率

锁实现的基本原理

Sync 的父类 AbstractQueuedSynchronizer 经常被称作队列同步器( AQS ),这个类非常重要,该类的父类是 AbstractOwnableSynchronizer

java.util.concurrent.locks.AbstractQueuedSynchronizer

此处的锁具备 synchronized 功能,即可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:

  1. 需要一个 state 变量,标记该锁的状态。 state 变量至少有两个值: 0 、 1 。对 state 变量的操作,使用 CAS 保证线程安全。java.util.concurrent.locks.AbstractQueuedSynchronizer#state

  2. 需要记录当前是哪个线程持有锁。java.util.concurrent.locks.AbstractOwnableSynchronizer#exclusiveOwnerThread

  3. 需要底层支持对一个线程进行阻塞或唤醒操作。工具类 java.util.concurrent.locks.LockSupport ,在当前线程中调用 park() ,该线程就会被阻塞;在另外一个线程中,调用 unpark(Thread thread) ,传入一个被阻塞的线程,就可以唤醒阻塞在 park() 地方的线程。

  4. 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要使用 CAS 。在 AQS 中利用双向链表和 CAS 实现了一个阻塞队列

    private transient volatile Node head;
    private transient volatile Node tail;
    
    static final class Node {
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;		// 每个Node对应一个被阻塞的线程
    }
    

阻塞队列是整个AQS核心中的核心。head 指向双向链表头部, tail 指向双向链表尾部。入队就是把新的 Node 加到 tail 后,初始的时候, head = tail = NULL ;然后,在往队列中加入阻塞的线程时,会新建一个空的 Node ,让 headtail 都指向这个空 Node ;之后,在后面加入被阻塞的线程对象。所以,当 head = tail 的时候,说明队列为空。

公平与非公平的 lock() 实现差异

如果是公平锁,会看等待队列中是否存在等待的线程。

tryLock() 实现分析

tryLock() 实现基于调用非公平锁的 tryAcquire(...),对state进行CAS操作,如果操作成功就拿到锁;如果操作不成功则直接返回false,也不阻塞。

如果需要公平性,可以使用 tryLock(0, TimeUnit.SECONDS)

读写锁

和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程和读线程之间不互斥。

读读不互斥,读写互斥,写写互斥

ReadWriteLock 是一个接口,内部由两个 Lock 接口组成。

java.util.concurrent.locks.ReadWriteLock

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

ReentrantReadWriteLock 实现了该接口

ReadLockWriteLock 是两把锁,实际上它只是同一把锁的两个视图而已。什么叫两个视图呢?可以理解为是一把锁,线程分成两类:读线程和写线程。读线程和写线程之间不互斥(可以同时拿到这把锁),读线程之间不互斥,写线程之间互斥。

Condition

Condition 与 Lock 的关系

Condition 本身也是一个接口,其功能和 wait / notify 类似

java.util.concurrent.locks.Condition

wait() / notify() 必须和 synchronized 一起使用, Condition 也必须和 Lock 一起使用。

StampedLock

StampedLock 是在 JDK 8 中新增的

并发度
ReentrantLock 读读互斥,读写互斥,写写互斥
ReentrantReadWriteLock 读读不互斥,读写互斥,写写互斥
StampedLock 读读不互斥,读写不互斥,写写互斥

可以看到,从 ReentrantLockStampedLock ,并发度依次提高。

另一方面,因为 ReentrantReadWriteLock 采用的是“悲观读”的策略,当第一个读线程拿到锁之后,第二个、第三个读线程还可以拿到锁,使得写线程一直拿不到锁,可能导致写线程“饿死”。虽然在其公平或非公平的实现中,都尽量避免这种情形,但还有可能发生。

StampedLock 引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。

线程池与 Future

线程池的实现原理

下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者 - 消费者模型。

20210614. 并发编程 - 拉勾教育

这里队列就是阻塞队列。

线程池的类继承体系

20210614. 并发编程 - 拉勾教育

在这里,有两个核心的类: ThreadPoolExectorScheduledThreadPoolExecutor ,后者不仅可以执行某个任务,还可以周期性地执行任务。

向线程池中提交的每个任务,都必须实现 Runnable 接口,通过最上面的 Executor 接口中的 execute(Runnable command) 向线程池提交任务。

然后,在 ExecutorService 中,定义了线程池的关闭接口 shutdown() ,还定义了可以有返回值
的任务,也就是 Callable

ThreadPoolExecutor

ThreadPoolExector 的核心数据结构:

public class ThreadPoolExecutor extends AbstractExecutorService {
    //...
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 存放任务的阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    // 对线程池内部各种变量进行互斥访问控制
    private final ReentrantLock mainLock = new ReentrantLock();
    // 线程集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    //...
}

WorkerThreadPoolExector 的内部类

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    // ...
    final Thread thread; // Worker封装的线程
    Runnable firstTask; // Worker接收到的第1个任务
    volatile long completedTasks; // Worker执行完毕的任务个数
    // ...
}

Worker 继承于 AQS ,也就是说 Worker 本身就是一把锁。这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。

核心配置参数解释

ThreadPoolExecutor 在其构造方法中提供了几个核心配置参数,来配置不同策略的线程池

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
}
参数 默认值 描述
corePoolSize 在线程池中始终维护的线程个数
maximumPoolSize corePooSize 已满、队列也满的情况下,扩充线程至此值
keepAliveTime maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回 corePoolSize
unit keepAliveTime 的时间单位
workQueue 线程池所用的阻塞队列类型
threadFactory java.util.concurrent.Executors.DefaultThreadFactory 线程创建工厂
handler java.util.concurrent.ThreadPoolExecutor.AbortPolicy corePoolSize 已满,队列已满,maxPoolSize 已满,最后的拒绝策略

下面来看这些配置参数在任务的提交过程中是怎么运作的。在每次往线程池中提交任务的时候,有如下的处理流程:

  1. 步骤一:判断当前线程数是否大于或等于 corePoolSize 。如果小于,则新建线程执行;如果大于,则进入步骤二。
  2. 步骤二:判断队列是否已满。如未满,则放入;如已满,则进入步骤三。
  3. 步骤三:判断当前线程数是否大于或等于 maxPoolSize 。如果小于,则新建线程执行;如果大于,则进入步骤四。
  4. 步骤四:根据拒绝策略,拒绝任务。

总结一下:首先判断 corePoolSize ,其次判断 blockingQueue 是否已满,接着判断 maxPoolSize ,最后使用拒绝策略。

很显然,基于这种流程,如果队列是*的,将永远没有机会走到步骤三,也即 maxPoolSize 没有使用,也一定不会走到步骤四。

线程池的优雅关闭

线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。

线程池的生命周期

线程池的状态有五种,分别是 RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED

20210614. 并发编程 - 拉勾教育

线程池有两个关闭方法, shutdown()shutdownNow() ,这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入 TIDYING 状态;最后执行一个钩子方法 terminated() ,进入 TERMINATED 状态,线程池才真正关闭。

这里的状态迁移有一个非常关键的特征:从小到大迁移,-10123,只会从小的状态值往大
的状态值迁移,不会逆向迁移。例如,当线程池的状态在 TIDYING=2 时,接下来只可能迁移到 TERMINATED=3 ,不可能迁移回 STOP=1 或者其他状态。

terminated() 之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
正确关闭线程池的步骤

关闭线程池的过程为:在调用 shutdown() 或者 shutdownNow() 之后,线程池并不会立即关闭,接下来需要调用 awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:

// executor.shutdownNow();
executor.shutdown();
try {
    boolean flag = true;
    do {
        flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);
    } while (flag);
} catch (InterruptedException e) {
    // ...
}
shutdown() 与 shutdownNow() 的区别
  • shutdown() 不会清空任务队列,会等所有任务执行完成, shutdownNow() 会清空任务队列。
  • shutdown() 只会中断空闲的线程, shutdownNow() 会中断所有线程。

TIDYINGTREMINATED 的区别是在二者之间执行了一个钩子方法 terminated()

线程池的 4 种拒绝策略

RejectedExecutionHandler 是一个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是 AbortPolicy

  • CallerRunsPolicy : 调用者直接在自己的线程里执行,线程池不处理
  • AbortPolicy : 线程池抛异常
  • DiscardPolicy : 线程池直接丢掉任务,不抛出异常
  • DiscardOldestPolicy : 删除队列中最早的任务,将当前任务入队列
public class ThreadPoolExecutorDemo {
    public static void main(String[] args) {
        AtomicInteger count = new AtomicInteger(0);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1, TimeUnit.SECONDS,
                // new ArrayBlockingQueue<>(3)
                new LinkedBlockingQueue<>()
                ,
                // new ThreadPoolExecutor.AbortPolicy()
                // new ThreadPoolExecutor.CallerRunsPolicy()
                // new ThreadPoolExecutor.DiscardOldestPolicy()
                new ThreadPoolExecutor.DiscardPolicy());
        for (int i = 0; i < 20; i++) {
            int finalI = i;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getId() + "[" + finalI + "] -- 开始");
                    try {
                        count.getAndIncrement();
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getId() + "[" + finalI + "] -- 结束");
                }
            });
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 关闭线程池
        executor.shutdown();
        boolean flag = true;
        try {
            do {
                flag = !executor.awaitTermination(1, TimeUnit.SECONDS);
                System.out.println("flag == " + flag);
            } while (flag);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程池关闭成功。。。");
        System.out.println("当前线程:" + Thread.currentThread());
        System.out.println("共执行了:" + count.get());
    }
}

Executors 工具类

concurrent 包提供了 Executors 工具类,利用它可以创建各种不同类型的线程池。

方法 描述
newSingleThreadExecutor 单线程的线程池
newFixedThreadPool 固定数目线程的线程池
newCachedThreadPool 每接收一个请求,就创建一个线程来执行
newSingleThreadScheduledExecutor 单线程具有周期调度功能的线程池
newScheduledThreadPool 多线程,有调度功能的线程池

最佳实践

不同类型的线程池,其实都是由前面的几个关键配置参数配置而成的。

在《阿里巴巴 Java 开发手册》中,明确禁止使用 Executors 创建线程池,并要求开发者直接使用 ThreadPoolExectorScheduledThreadPoolExecutor 进行创建。这样做是为了强制开发者明确线程池的运行策略,使其对线程池的每个配置参数皆做到心中有数,以规避因使用不当而造成资源耗尽的风险。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 实现了按时间调度来执行任务

方法 描述
schedule 延迟执行任务
scheduleAtFixedRate 周期执行任务,按固定频率执行,与任务本身执行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是 5s ,每 5s 执行一次任务,任务的执行时间必须小于 5s 。
scheduleWithFixedDelay 周期执行任务,按固定间隔执行,与任务本身执行时间有关。例如,任务本身执行时间是 10s ,间隔 2s ,则下一次开始执行的时间就是 12s 。
@Slf4j
public class ScheduledThreadPoolExecutorDemo {
    public static void main(String[] args) {
        log.info("main begin ...");
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);


        // executorService.scheduleWithFixedDelay(newRunnable("delay"), 1, 1, TimeUnit.SECONDS);
        executorService.scheduleAtFixedRate(newRunnable("rate"), 1, 3, TimeUnit.SECONDS);

        log.info("main end ...");
    }

    private static Runnable newRunnable(String str) {
        Runnable runnable = () -> {
            log.info(str + "  thread begin ...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info(str + "   thread end ...");
        };
        return runnable;
    }
}

CompletableFuture 用法

在 JDK 8 之前,异步编程可以通过线程池和 Future 来实现,但功能还不够强大。从 JDK 8 开始,在 Concurrent 包中提供了一个强大的异步编程工具 CompletableFuture

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> future = new CompletableFuture();

        new Thread() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 另一个线程执行任务,将结果赋值给future
                future.complete("hello lagou");
            }
        }.start();

        System.out.println("任务已经提交");

        // 阻塞的方法
        String result = future.get();
        System.out.println(result);

    }
}

CompletableFuture 实现了 Future 接口,所以它也具有 Future 的特性:调用 get() 方法会阻塞在那,直到结果返回。另外 1 个线程调用 complete 方法完成该 Future ,则所有阻塞在 get() 方法的线程都将获得返回结果。

runAsync 与 supplyAsync

runAsync

public class CompletableFutureDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 通过异步的方式给future指派任务,future没有返回值
        CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务执行完毕");
            }
        });

        Object o = future.get();
        System.out.println(o);  // null

    }
}

supplyAsync

public class CompletableFutureDemo3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 指定future要执行的任务,同时future会有返回值
        CompletableFuture<String> future
                = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "hello lagou";
            }
        });

        String result = future.get();   // hello lagou
        System.out.println(result);

    }
}

CompletableFuture.runAsync 传入的是一个 Runnable 接口,没有返回值(返回值为 null )。

CompletableFuture.supplyAsync 传入的是一个 Supplier 接口,有返回值。

thenRun、thenAccept 和 thenApply

对于 Future ,在提交任务之后,只能调用 get() 等结果返回;但对于 CompletableFuture ,可以在结果上面再加一个 callback ,当得到结果之后,再接着执行 callback

  • thenRun(Runnable)
  • thenAccept(Consumer)
  • thenApply(Function)

三个例子都是在任务执行完成之后,接着执行回调,只是回调的形式不同:

  • thenRun 后面跟的是一个无参数、无返回值的方法,即 Runnable ,所以最终的返回值是 CompletableFuture<Void> 类型。
  • thenAccept 后面跟的是一个有参数、无返回值的方法,称为 Consumer ,返回值也是 CompletableFuture<Void> 类型。顾名思义,只进不出,所以称为 Consumer ;前面的 Supplier ,是无参数,有返回值,只出不进,和 Consumer 刚好相反。
  • thenApply 后面跟的是一个有参数、有返回值的方法,称为 Function 。返回值是 CompletableFuture<String> 类型。

而参数接收的是前一个任务,即 supplyAsync(...) 这个任务的返回值。因此这里只能用 supplyAsync ,不能用 runAsync 。因为 runAsync 没有返回值,不能为下一个链式方法传入参数。

thenCompose 与 thenCombine

在上面的例子中, thenApply 接收的是一个 Function ,但是这个 Function 的返回值是一个通常的基本数据类型或一个对象,而不是另外一个 CompletableFuture 。如果 Function 的返回值也是一个 CompletableFuture ,就会出现嵌套的 CompletableFuture

如果希望返回值是一个非嵌套的 CompletableFuture ,可以使用 thenCompose

@Slf4j
public class CompletableFutureDemo7 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                log.info("task 1");
                return "hello lagou";
            }
        }).thenCompose(new Function<String, CompletableFuture<Integer>>() {
            @Override
            public CompletableFuture<Integer> apply(String s) {
                log.info("task 2");
                return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        log.info("task 3");
                        return s.length();
                    }
                });
            }
        });

        log.info("task 4");
        log.info(String.valueOf(future.get()));

    }

    //    public static void main(String[] args) throws ExecutionException, InterruptedException {
    //        CompletableFuture<CompletableFuture<Integer>> future
    //                = CompletableFuture.supplyAsync(new Supplier<String>() {
    //            @Override
    //            public String get() {
    //                return "hello lagou";
    //            }
    //        }).thenApply(new Function<String, CompletableFuture<Integer>>() {
    //            @Override
    //            public CompletableFuture<Integer> apply(String s) {
    //                return CompletableFuture.supplyAsync(new Supplier<Integer>() {
    //                    @Override
    //                    public Integer get() {
    //                        return s.length();
    //                    }
    //                });
    //            }
    //        });
    //
    //        Integer integer = future.get().get();
    //        System.out.println(integer);
    //
    //    }
}

thenCombine 第 1 个参数是一个 CompletableFuture 类型,第 2 个参数是一个方法,并且是一个 BiFunction ,也就是该方法有 2 个输入参数, 1 个返回值。从该接口的定义可以大致推测,它是要在 2 个 CompletableFuture 完成之后,把 2 个 CompletableFuture 的返回值传进去,再额外做一些事情。

@Slf4j
public class CompletableFutureDemo8 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                log.info("task 1");
                return "hello";
            }
        }).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                log.info("task 2");
                return "lagou";
            }
        }), new BiFunction<String, String, Integer>() {
            @Override
            public Integer apply(String s, String s2) {
                log.info("task 3");
                System.out.println("s = " + s);
                System.out.println("s2 = " + s2);
                return s.length() + s2.length();
            }
        });

        System.out.println(future.get());
    }
}

allOf 和 anyOf ,任意个 CompletableFuture 的组合

allOf 的返回值是 CompletableFuture<Void> 类型,这是因为每个传入的 CompletableFuture 的返回值都可能不同,所以组合的结果是无法用某种类型来表示的,索性返回 Void 类型。

anyOf 的含义是只要有任意一个 CompletableFuture 结束,就可以做接下来的事情,而无须像 allOf 那样,等待所有的 CompletableFuture 结束。但由于每个 CompletableFuture 的返回值类型都可能不同,任意一个,意味着无法判断是什么类型,所以 anyOf 的返回值是 CompletableFuture<Object> 类型。

public class CompletableFutureDemo9 {

    private static final Random RANDOM = new Random();
    private static volatile int result = 0;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture[] futures = new CompletableFuture[10];

        for (int i = 0; i < 10; i++) {
            CompletableFuture<Void> myFuture = CompletableFuture.runAsync(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000 + RANDOM.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    result++;
                }
            });

            futures[i] = myFuture;
        }

       for (int i = 0; i < 10; i++) {
           futures[i].get();
           System.out.println(result);
       }

       CompletableFuture<Void> future = CompletableFuture.allOf(futures).thenRun(new Runnable() {
           @Override
           public void run() {
               System.out.println("计算完成");
           }
       });

       future.get();
       System.out.println(result);


        // CompletableFuture myfuture = CompletableFuture.anyOf(futures).thenRun(new Runnable() {
        //     @Override
        //     public void run() {
        //         System.out.println(result);     // 1
        //     }
        // });
        //
        // System.out.println(myfuture.get()); // null
    }
}

四种任务原型

通过上面的例子可以总结出,提交给 CompletableFuture 执行的任务有四种类型: RunnableConsumerSupplierFunction 。下面是这四种任务原型的对比:

接口 有无参数 有无返回值 提交方法
Runnable runAsyncthenRun
Consumer thenAccept
Supplier supplierAsync
Function thenApply

runAsyncsupplierAsyncCompletableFuture 的静态方法;而 thenAcceptthenApplyCompletableFutre 的成员方法。因为初始的时候没有 CompletableFuture 对象,也没有参数可传,所以提交的只能是 Runnable 或者 Supplier ,只能是静态方法;

通过静态方法生成 CompletableFuture 对象之后,便可以链式地提交其他任务了,这个时候就可以提交 RunnableConsumerFunction ,且都是成员方法。

CompletionStage 接口

CompletableFuture 不仅实现了 Future 接口,还实现了 CompletableStage 接口。

CompletionStage 接口定义的正是前面的各种链式方法、组合方法。

CompletableFuture 内部原理

CompletableFuture 的构造:ForkJoinPool

CompletableFuture 中任务的执行依靠 ForkJoinPool

任务类型的适配

ForkJoinPool 接受的任务是 ForkJoinTask 类型,而我们向 CompletableFuture 提交的任务是 Runnable/Supplier/Consumer/Function 。因此,肯定需要一个适配机制,把这四种类型的任务转换成 ForkJoinTask ,然后提交给 ForkJoinPool ,如下图所示:

20210614. 并发编程 - 拉勾教育

为了完成这种转换,在 CompletableFuture 内部定义了一系列的内部类

thenApply 与 thenApplyAsync 的区别
  • 如果上一个任务指定了线程池,thenApply 未指定线程池,thenApply 使用上一个任务指定的线程池
  • 如果上一个任务指定了线程池,thenApplyAsync 未指定线程池,thenApplyAsync 使用默认 ForkJoinPool 线程池

thenRunthenRunAsyncthenAcceptthenAcceptAsync 的区别同理

任务的网状执行:有向无环图

如果任务只是链式执行,便不需要在每个 CompletableFuture 里面设 1 个栈了,用 1 个指针使所有任务组成链表即可。但实际上,任务不只是链式执行,而是网状执行,组成 1 张图。

20210614. 并发编程 - 拉勾教育

ForkJoinPool

ForkJoinPool 用法

ForkJoinPool 就是 JDK 7 提供的一种“分治算法”的多线程并行计算框架。 Fork 意为分叉, Join 意为合并,一分一合,相互配合,形成分治算法。

此外,也可以将 ForkJoinPool 看作一个单机版的 Map/Reduce ,多个线程并行计算。

相比于 ThreadPoolExecutorForkJoinPool 可以更好地实现计算的负载均衡,提高资源利用率。假设有 5 个任务,在 ThreadPoolExecutor 中有 5 个线程并行执行,其中一个任务的计算量很大,其余 4 个任务的计算量很小,这会导致 1 个线程很忙,其他 4 个线程则处于空闲状态。利用 ForkJoinPool ,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计算的负载均衡。

案例:求和,每次递归有返回值

@Slf4j
public class ForkJoinPoolDemo01 {
    static class SumTask extends RecursiveTask<Long> {
        private static final int THRESHOLD = 10;
        private long start;
        private long end;

        public SumTask(long n) {
            this(1, n);
        }

        public SumTask(long start, long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            long sum = 0;
            // 如果计算的范围在threshold之内,则直接进行计算
            if ((end - start) <= THRESHOLD) {
                for (long l = start; l <= end; l++) {
                    sum += l;
                }
                log.info("if start = {}, end = {}, sum = {}", start, end, sum);
            } else {
                // 否则找出起始和结束的中间值,分割任务
                long mid = (start + end) >>> 1;
                SumTask left = new SumTask(start, mid);
                SumTask right = new SumTask(mid + 1, end);
                left.fork();
                right.fork();
                // 收集子任务计算结果
                sum = left.join() + right.join();
                log.info("else start = {}, end = {}, sum = {}", start, end, sum);
            }

            return sum;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        SumTask sum = new SumTask(30);
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> future = pool.submit(sum);
        Long aLong = future.get();
        log.info(aLong + "");
        pool.shutdown();
    }
}

案例:快排,每次递归没有返回值

public class ForkJoinPoolDemo02 {
    static class SortTask extends RecursiveAction {
        final long[] array;
        final int lo;
        final int hi;

        public SortTask(long[] array) {
            this.array = array;
            this.lo = 0;
            this.hi = array.length - 1;
        }

        public SortTask(long[] array, int lo, int hi) {
            this.array = array;
            this.lo = lo;
            this.hi = hi;
        }

        private int partition(long[] array, int lo, int hi) {
            long x = array[hi];
            int i = lo - 1;
            for (int j = lo; j < hi; j++) {
                if (array[j] <= x) {
                    i++;
                    swap(array, i, j);
                }
            }
            swap(array, i + 1, hi);
            return i + 1;
        }

        private void swap(long[] array, int i, int j) {
            if (i != j) {
                long temp = array[i];
                array[i] = array[j];
                array[j] = temp;
            }
        }

        @Override
        protected void compute() {
            if (lo < hi) {
                // 找到分区的元素下标
                int pivot = partition(array, lo, hi);
                // 将数组分为两部分
                SortTask left = new SortTask(array, lo, pivot - 1);
                SortTask right = new SortTask(array, pivot + 1, hi);

                left.fork();
                right.fork();
                left.join();
                right.join();
            }
        }

    }

    public static void main(String[] args) throws InterruptedException {
        long[] array = {5, 3, 7, 9, 2, 4, 1, 8, 10};
        // 一个任务
        ForkJoinTask sort = new SortTask(array);
        // 一个pool
        ForkJoinPool pool = new ForkJoinPool();
        // ForkJoinPool开启多个线程,同时执行上面的子任务
        pool.submit(sort);
        // 结束ForkJoinPool
        pool.shutdown();
        // 等待结束Pool
        pool.awaitTermination(10, TimeUnit.SECONDS);

        System.out.println(Arrays.toString(array));
    }
}

核心数据结构

ThreadPoolExector 不同的是,除一个全局的任务队列之外,每个线程还有一个自己的局部队列。

20210614. 并发编程 - 拉勾教育

public class ForkJoinPool extends AbstractExecutorService {
    
	volatile WorkQueue[] workQueues;
    volatile long ctl;
    
    static final class WorkQueue {
     	ForkJoinTask<?>[] array;   
    }
    
}

工作窃取队列

关于上面的全局队列,有一个关键点需要说明:它并非使用 BlockingQueue ,而是基于一个普通的数组得以实现。这个队列又名工作窃取队列,为 ForkJoinPool 的工作窃取算法提供服务。

所谓工作窃取算法,是指一个 Worker 线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。这个过程要用到工作窃取队列。

20210614. 并发编程 - 拉勾教育

这个队列只有如下几个操作:

  1. Worker 线程自己,在队列头部,通过对 top 指针执行加、减操作,实现入队或出队,这是单线程的。
  2. 其他 Worker 线程,在队列尾部,通过对 base 进行累加,实现出队操作,也就是窃取,这是多线程的,需要通过 CAS 操作。

ForkJoinPool 状态控制

类似于 ThreadPoolExecutor ,在 ForkJoinPool 中也有一个 ctl 变量负责表达 ForkJoinPool 的整个生命周期和相关的各种状态。

阻塞栈 Treiber Stack

要实现多个线程的阻塞、唤醒,除了 park/unpark 这一对操作原语,还需要一个无锁链表实现的阻塞队列,把所有阻塞的线程串在一起。在 ForkJoinPool 中,没有使用阻塞队列,而是使用了阻塞栈。把所有空闲的 Worker 线程放在一个栈里面,这个栈同样通过链表来实现,名为 Treiber Stack 。

Worker 线程的阻塞-唤醒机制

ForkerJoinPool 没有使用 BlockingQueue ,也就不利用其阻塞 / 唤醒机制,而是利用了 park/unpark 原语,并自行实现了 Treiber Stack 。

ForkJoinPool 的优雅关闭

ThreadPoolExecutor 一样, ForkJoinPool 的关闭也不可能是“瞬时的”,而是需要一个平滑的过渡过程

shutdown() 只拒绝新提交的任务; shutdownNow() 会取消现有的全局队列和局部队列中的任务,同时唤醒所有空闲的线程,让这些线程自动退出。

多线程设计模式

Single Threaded Execution 模式

所谓 Single Threaded Execution 模式,指的是“以一个线程执行”。该模式用于设置限制,以确保同一时间只能让一个线程执行处理。

Single Threaded Execution 有时也称为临界区( critical section )或临界域( critical region )。 Single Threaded Execution 名称侧重于执行处理的线程,临界区或临界域侧重于执行范围。

Immutable 模式

Immutable 就是不变的、不发生改变。 Immutable 模式中存在着确保实例状态不发生改变的类。在访问这些实例时不需要执行耗时的互斥处理。如果能用好该模式,就可以提高程序性能。如 String 就是一个不可变类, immutable 的。

JDK 中的不可变模式:

  • java.lang.String
  • java.math.BigInteger
  • java.math.Decimal
  • java.util.regex.Pattern
  • java.lang.Boolean
  • java.lang.Byte
  • java.lang.Character
  • java.lang.Double
  • java.lang.Float
  • java.lang.Integer
  • java.lang.Long
  • java.lang.Short
  • java.lang.Void

Guarded Suspension 模式

Guarded 表示被守护、被保卫、被保护。 Suspension 表示暂停。如果执行现在的处理会造成问题,就让执行处理的线程进行等待——这就是 Guarded Suspension 模式。

Guarded Suspension 模式通过让线程等待来保证实例的安全型。 Guarded Suspension 也称为 guarded wait 、 spin lock 等名称。

可以将 Guarded Suspension 理解为多线程版本的 if 。

阻塞队列

Balking 模式

所谓 Balk ,就是停止并返回的意思。

Balking 模式与 Guarded Suspension 模式一样,也存在守护条件。在 Balking 模式中,如果守护条件不成立,则立即中断处理。而 Guarded Suspension 模式一直等待直到可以运行。

Balking 和 Guarded Suspension 模式之间:介于“直接 balk 并返回”和“等待到守护条件成立为止“这两种极端之间的还有一种”在守护条件成立之前等待一段时间“。在守护条件成立之前等待一段时间,如果到时条件还未成立,则直接 balk 。这种操作称为计时守护( guarded timed )或超时( timeout )。

java.util.concurrent 中的超时:

  • 通过异常通知超时

    当发生超时抛出异常时,不适合使用返回值表示超时,需要使用 java.util.concurrent.TimeoutException 异常。如:

    • java.util.concurrent.Futureget 方法
    • java.util.concurrent.Exchangerexchange 方法
    • java.util.concurrent.Cyclicarrierawait 方法
    • java.util.concurrent.CountDownLatchawait 方法
  • 通过返回值通知超时

    当执行多次 try 时,则不使用异常,而使用返回值表示超时。如:

    • java.util.concurrent.BlockingQueue 接口,当 offer 方法的返回值为 false ,或 poll 方法的返回值为 null ,表示发生了超时。
    • java.util.concurrent.Semaphore 类,当 tryAcquire 方法的返回值为 false 时,表示发生了超时。
    • java.util.concurrent.locks.Lock 接口,当 tryLock 方法的返回值为 false 时,表示发生了超时。

Producer-Consumer 模式

生产者安全地将数据交给消费者。当生产者和消费者以不同的线程运行时,两者之间的处理速度差异会有问题。生产者消费者模式用于消除线程间处理速度的差异带来的问题。在该模式中,生产者和消费者都有多个,当生产者和消费者只有一个时,我们称为管道( Pipe )模式。

JUC 包和 Producer-Consumer 模式:JUC 中提供了 BlockingQueue 接口及其实现类,相当于 Producer-Consumer 模式中的 Channel 角色。

  • BlockingQueue 接口 —— 阻塞队列
  • ArrayBlockingQueue —— 基于数组的 BlockingQueue
  • LinkedBlockingQueue —— 基于链表的 BlockingQueue
  • PriorityBlockingQueue —— 带有优先级的 BlockingQueue
  • DelayQueue —— 一定时间之后才可以 takeBlockingQueue
  • SynchronousQueue —— 直接传递的 BlockingQueue
  • ConcurrentLinkedQueue —— 元素个数没有最大限制的线程安全队列

Read-Write Lock 模式

当线程读取实例的状态时,实例的状态不会发生变化。实例的状态仅在线程执行写入操作时才会发生变化。

从实例状态变化来看,读取和写入有本质的区别。

在本模式中,读取操作和写入操作分开考虑。在执行读取操作之前,线程必须获取用于读取的锁。在执行写入操作之前,线程必须获取用于写入的锁。

可以多个线程同时读取,读取时不可写入。

当线程正在写入时,其他线程不可以读取或写入。

一般来说,执行互斥会降低程序性能。如果把写入的互斥和读取的互斥分开考虑,则可以提高性能。

JUC 包和 Read-Write Lock 模式

java.util.concurrent.locks 包提供了已实现 Read-Write Lock 模式的 ReadWriteLock 接口和 ReentrantReadWriteLock 类。

java.util.concurrent.locks.ReadWriteLock 接口的功能和当前案例中的 ReadWriteLock 类类似。不同之处在于该接口用于读取的锁和用于写入的锁是通过其他对象来实现的。

当前案例的 ReadWriteLock java.util.concurrent.locks 中的 ReadWriteLock 接口
readLock() readLock().lock()
readUnlock() readLock().unlock()
writeLock() writeLock().lock()
writeUnlock() writeLock().unlock()

Thread-Per-Message 模式

该模式可以理解为“每个消息一个线程”。消息这里可以理解为命令或请求。每个命令或请求分配一个线程,由这个线程来处理。这就是 Thread-Per-Message 模式。

在 Thread-Per-Message 模式中,消息的委托方和执行方是不同的线程。

JUC 包和 Thread-Per-Message 模式

  • java.lang.Thread 类:最基本的创建、启动线程的类
  • java.lang.Runnable 接口:线程锁执行的任务接口
  • java.util.concurrent.ThreadFactory 接口:将线程创建抽象化的接口
  • java.util.concurrent.Executors :用于创建实例的工具类
  • java.util.concurrent.Executor 接口:将线程执行抽象化的接口
  • java.util.concurrent.ExecutorService 接口:将被复用的线程抽象化的接口
  • java.util.concurrent.ScheduledExecutorService 类:将被调度线程的执行抽象化的接口

Worker Thread 模式

在 Worker Thread 模式中,工人线程( worker thread )会逐个取回工作并进行处理。当所有工作全部完成后,工人线程会等待新的工作到来。 Worker Thread 模式也被称为 Background Thread 模式。有时也称为 Thread Pool 模式。

JUC包和Worker Thread模式

  1. ThreadPoolExecutor

    java.util.concurrent.ThreadPoolExecutor 类是管理 Worker 线程的类。可以轻松实现 Worker Thread 模式。

  2. 通过 java.util.concurrent 包创建线程池

    java.util.concurrent.Executors 类就是创建线程池的工具类。

Future 模式

Future 的意思是未来。假设由一个方法需要长时间执行才能获取结果,则一般不会让调用的程序等待,而是先返回给它一张“提货卡”。获取提货卡并不消耗很多时间。该“提货卡”就是 Future 角色。获取 Future 角色的线程稍后使用 Future 角色来获取运行结果。

JUC 包与 Future 模式

java.util.concurrent 包提供了用于支持 Future 模式的类和接口。

java.util.concurrent.Callable 接口将“返回值的某种处理调用”抽象化了。Callable 接口声明了 call 方法。 call 方法类似于 Runnablerun 方法,但是 call 方法有返回值。 Callable<String> 表示 Callable 接口的 call 方法返回值类型为 String 类型。

java.util.concurrent.Future 接口相当于本案例中的 Future 角色。Future 接口声明了 get 方法来获取结果,但是没有声明设置值的方法。设置值的方法需要在 Future 接口的实现类中声明。 Future<String> 表示“ Future 接口的 get 方法返回值类型是 String 类型”。除了 get 方法, Future 接口还声明了用于中断运行的 cancel 方法。

java.util.concurrent.FutureTask 类是实现了 Future 接口的标准类。FutureTask 类声明了用于获取值的 get 方法、用于中断运行的 cancel 方法、用于设置值的 set 方法,以及用于设置异常的 setException 方法。由于 FutureTask 类实现了 Runnable 接口,还声明了 run 方法。

上一篇:Java 8 - CompletableFuture组合式异步编程


下一篇:java8中CompletableFuture的使用介绍