多线程编程中,锁是大家比较熟悉的概念,但对无锁编程则没有太多了解。无锁编程是指不用加锁的方式去解决原本需要加锁才能解决的问题,从而使程序具有更高的性能,降低硬件成本。我们从并发开始说起。
一、并发相关概念
并发数:服务器同时并行处理的请求数量。
QPS:每秒处理完成的请求数量,是衡量系统吞吐量的一种方式。例如:Tomcat接收了200个请求,但1秒内处理完毕的请求为20个,则QPS为20。
高并发:高并发是一个相对的概念,假设机器配置差劲(例如1核1G),如果在这个机器上部署一个项目,这时200个请求就已经到达该机器的上限了,那么这时面临的场景就是高并发的场景。处理高并发并不一定是并发量非常高,我们才会用到这样的技术。可能你们公司的并发量只有1000,15万的用户量,依然可能会用到高并发的技术,为什么——降低成本(用户少,公司没钱__)。
二、并发下的原子操作
Oricle官网上Java入门手册中关于原子操作(Atomic Variables)的定义如下:
In programming, an atomic action is one that effectively happens all at once. An atomic action cannot stop in the middle: it either happens completely, or it doesn‘t happen at all. No side effects of an atomic action are visible until the action is complete.
原子操作可以是一个步骤或多个操作步骤,要么完全发生,要么根本不发生,不能停在中间状态。将整个操作视作一个整体,资源在该次操作中保持一致,这是线程安全中原子性的核心特征。
三、并发的原子性问题
来看一个计数器的例子:
class Counter {
private int i = 0;
public void increment() {
i++;
}
public void decrement() {
i--;
}
public int value() {
return i;
}
}
我们为它配上启动器,启动器中开启两个线程,每个线程执行10000次increment()
函数调用,所以执行完预期计数器结果为20000。
public class Starter {
public static void main(String[] args) throws InterruptedException {
var counter = new Counter();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
counter.increment();
}
}).start();
}
Thread.sleep(5000);
System.out.println(counter.value());
}
}
运行结果一般是不正确的。
这显然是一个不正确的结果,为什么不正确?——i++
并没有保证原子性。我们可以通过分析JVM执行的字节码(byte code)来一探究竟(IDEA可以通过安装插件来实现,具体参考:IDEA下查看Java字节码插件):
0 aload_0
1 dup
2 getfield #2 <Counter.i : I>
5 iconst_1
6 iadd
7 putfield #2 <Counter.i : I>
10 return
可以看到字节码包括以下关键的三个步骤:
- 取值:
2 getfield #2 <Counter.i : I>
- 相加:
6 iadd
- 放回:
7 putfield #2 <Counter.i : I>
接下来结合多线程来进行分析为什么i++
没有实现原子性。
做了两次计算,得出同样的值,很明显这不是我们期望的结果,不符合我们编写程序的期望——操作失败。这就是原子性问题的来源。
这里举一个计数器的应用——限流。限流的目的是超过一定数量的请求,不予操作。例如,有一个Controller方法:
@RestController
class XXXController {
final Counter counter = new Counter();
@GetController(...)
public void index() {
// 统计正在处理的请求数量
counter.increase();
try {
if (counter.value() > 3000) {
return;
}
// TODO: 很多业务逻辑,消耗CPU、内存,资源有限
} finally {
counter.decrease();
}
}
}
类似的限流在Redis
、SpringCloud
、Nginx
、微服务、网关中都有,其最核心技术一定是计数器。
三、并发控制策略
并发导致了上述问题,我们需要控制并发,以达到我们想要的效果。并发控制有两种策略:悲观策略(Pessimistic concurrency control)和乐观策略(Optimistic concurrency control)。这是两种解决并发问题的思路。
悲观策略是一种悲观的思想,即认为写较多,遇到并发的可能性高,每次拿数据时,都会认为别人会修改数据,所以同一时间只允许一个线程执行。
乐观策略是一种乐观的思想,即认为读多写少,遇到并发的可能性低,每次拿数据时都认为别人不会修改,所以不会上锁。但是在更新的时候会判断判断值有没有发生变化,如果发生变化了,意味着其他线程已经更新,则此次计算更新失效,需要重新取值计算。
适用场景:
- 重试成本小;
- 并发数量小;
无锁编程要先了解有锁编程的问题。
四、锁带来的问题
按照悲观策略的思想,我们当然可以通过加锁的方式解决并发不一致问题,计数器例子的加锁版本代码如下:
class SynchronizedCounter {
private int i = 0;
public synchronized void increment() {
i++;
}
public synchronized void decrement() {
i--;
}
public synchronized int value() {
return i;
}
}
但是加锁存在一些问题:
我们知道,线程的数量一定的远远大于CPU数量的,如上图所示,CPU只有3个,而线程数则可能成百上千。如何把这么多的线程分配到有限的CPU上,这就是操作系统调度程序做的事情。JVM进程中的线程也是由操作系统中的调度器负责将其调度到指定的CPU上执行。
再来分析上面加锁版本的计数器。线程-1和线程-2都在抢锁,假设线程-1抢到了锁,线程2没有抢到锁,线程2变为阻塞状态。那么,下一次线程-2
什么时候?可能会在1ms之后,也可能10ms之后执行。也就是说,由于操作系统调度存在,所以这是不确定的。线程-1
执行完毕。
假设执行一次i++
需要0.05ms,则可能会发生以下场景:
? 第一次i++ 0.05ms
? 线程切换 --- 10ms
? 第二次i++ 0.05ms
这就是说,每发生一次枪锁失败导致的线程切换,都会增加一定的线程切换时间、等待时间,从而导致请求的处理时间变长,降低了系统的吞吐量。
五、无锁编程实现
通过分析计数器例子中计数器累加操作i++
,我们发现:
- 读取值:没有问题——不会破坏数据的状态;
- 计算值:没有问题——不会破坏数据的状态;
- 赋值:会修改内存数据,导致数据结果不符合程序的原子性要求——会破坏数据的状态,即:存在并发安全问题
按照乐观并发控制策略,假设不存在多线程竞争,我们不进行加锁操作,那么第1步和第2步都没有问题。第3步,写结果到内存时,需要判断在第1步和第2步中,刚才的假设是否成立,如果成立,则直接将结果写入内存,否则刚才计算结果不正确,需要重新计算。也就是说,最后一步是无锁编程的核心,如何确保判断并根据结果写回内存这一操作具备原子性?
这需要硬件的支持。
最后一步判断并根据结果写回内存本质是操作内存中一个值,而内存正好提供了这么一种机制——CAS(Compare and swap)机制,属于硬件同步原语。该方法需要三个参数:内存地址、当前值A、新值B。
CAS的具体操作:先比较输入的当前值和内存中的值是否一致,不一致则代表内存数据发生了变化,CAS失败。如果输入的旧值A和内存中的值一致,则将值A替换为值B,CAS操作成功。
Java提供了一种直接操作内存的工具类sun.misc.Unsafe
,顾名思义,这个类对于普通程序员来说是”危险“的,一般应用开发者不会用到这个类。在Unsafe
类中就有一系列的compareAndSwapXXX()
方法,例如,本文要用到的compareAndSwapInt
函数:
@ForceInline
public final boolean compareAndSwapInt(Object o, long offset,
int expected,
int x) {
return theInternalUnsafe.compareAndSetInt(o, offset, expected, x);
}
这个函数的四个参数分别代表要操作的对象、对象属性相对该对象的偏移量、期望内存中的值以及计算后的值。参数1、3、4都很容易获取,第2个成员变量相对该对象的偏移量怎么获取?我们也可以通过这个工具类获取:
@ForceInline
public long objectFieldOffset(Field f) {
return theInternalUnsafe.objectFieldOffset(f);
}
最后我们要调用这些方法,我们首先要获取Unsafe
的实例变量。然而,这个类的构造函数时私有的,应用程序也无法通过其静态成员方法getUnsafe()
获取其实例。那么,怎么获取Unsafe
的实例呢?这就要使用强大的反射机制了(反射就像一面镜子,我们往JVM中放了什么东西,我们就可以从JVM里获取到这些东西,也许这是反射名字的由来吧!)。
public static Unsafe getUnsafe() throws IllegalAccessException {
Field unsafeField = Unsafe.class.getDeclaredFields()[0];
unsafeField.setAccessible(true);
return (Unsafe) unsafeField.get(null);
}
最后,基于CAS机制的Counter实现如下:
public class CASCounter {
private int i = 0;
private static Unsafe unsafe; // Unsafe实例
private static long offset; // i相对CASCounter对象实例的偏移
static {
try {
// 通过反射机制获取Unsafe实例
var unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
unsafe = (Unsafe) unsafeField.get(null);
// 获取i相对CASCounter对象实例的偏移
Field fc = CASCounter.class.getDeclaredField("i");
offset = unsafe.objectFieldOffset(fc);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
public void increment() {
while (true) {
int currentValue = i;
int newValue = currentValue + 1;
// 原子方式的比较写回操作,如果成功则返回,否则重新获取并计算(重试)
if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {
return;
}
}
}
public void decrement() {
while (true) {
int currentValue = i;
int newValue = currentValue - 1;
// 原子方式的比较写回操作,如果成功则返回,否则重新获取并计算(重试)
if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {
return;
}
}
}
public int value() {
return i;
}
}
六、性能比较
我们在启动器开启的线程中加入计时逻辑,如下:
for (int i = 0; i < 2; i++) {
new Thread(() -> {
var begin = System.nanoTime();
for (int j = 0; j < 10000; j++) {
counter.increment();
}
System.out.println(System.nanoTime() - begin);
}).start();
}
运行结果如下:
使用SynchronizeCounter
的运行结果
使用CASCounter
的运行结果
可以看到两者性能差距在10倍以上!