【深入浅出多线程】无锁编程

多线程编程中,锁是大家比较熟悉的概念,但对无锁编程则没有太多了解。无锁编程是指不用加锁的方式去解决原本需要加锁才能解决的问题,从而使程序具有更高的性能,降低硬件成本。我们从并发开始说起。

一、并发相关概念

并发数:服务器同时并行处理的请求数量。

QPS:每秒处理完成的请求数量,是衡量系统吞吐量的一种方式。例如:Tomcat接收了200个请求,但1秒内处理完毕的请求为20个,则QPS为20。

高并发:高并发是一个相对的概念,假设机器配置差劲(例如1核1G),如果在这个机器上部署一个项目,这时200个请求就已经到达该机器的上限了,那么这时面临的场景就是高并发的场景。处理高并发并不一定是并发量非常高,我们才会用到这样的技术。可能你们公司的并发量只有1000,15万的用户量,依然可能会用到高并发的技术,为什么——降低成本(用户少,公司没钱__)。

二、并发下的原子操作

Oricle官网上Java入门手册中关于原子操作(Atomic Variables)的定义如下:

In programming, an atomic action is one that effectively happens all at once. An atomic action cannot stop in the middle: it either happens completely, or it doesn‘t happen at all. No side effects of an atomic action are visible until the action is complete.

原子操作可以是一个步骤或多个操作步骤,要么完全发生,要么根本不发生,不能停在中间状态。将整个操作视作一个整体,资源在该次操作中保持一致,这是线程安全中原子性的核心特征。

三、并发的原子性问题

来看一个计数器的例子:

class Counter {
    
    private int i = 0;

    public void increment() {
        i++;
    }

    public void decrement() {
        i--;
    }

    public int value() {
        return i;
    }
}

我们为它配上启动器,启动器中开启两个线程,每个线程执行10000次increment()函数调用,所以执行完预期计数器结果为20000。

public class Starter {
    public static void main(String[] args) throws InterruptedException {
        var counter = new Counter();
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    counter.increment();
                }
            }).start();
        }

        Thread.sleep(5000);
        System.out.println(counter.value());
    }
}

运行结果一般是不正确的。

【深入浅出多线程】无锁编程

这显然是一个不正确的结果,为什么不正确?——i++并没有保证原子性。我们可以通过分析JVM执行的字节码(byte code)来一探究竟(IDEA可以通过安装插件来实现,具体参考:IDEA下查看Java字节码插件):

 0 aload_0
 1 dup
 2 getfield #2 <Counter.i : I>
 5 iconst_1
 6 iadd
 7 putfield #2 <Counter.i : I>
10 return

可以看到字节码包括以下关键的三个步骤:

  1. 取值:2 getfield #2 <Counter.i : I>
  2. 相加: 6 iadd
  3. 放回:7 putfield #2 <Counter.i : I>

接下来结合多线程来进行分析为什么i++没有实现原子性。

【深入浅出多线程】无锁编程

做了两次计算,得出同样的值,很明显这不是我们期望的结果,不符合我们编写程序的期望——操作失败。这就是原子性问题的来源。

这里举一个计数器的应用——限流。限流的目的是超过一定数量的请求,不予操作。例如,有一个Controller方法:

@RestController
class XXXController {
    final Counter counter = new Counter();
    
    @GetController(...)
	public void index() {
        // 统计正在处理的请求数量
        counter.increase();
        try {
            if (counter.value() > 3000) {
                return;
            }
            // TODO: 很多业务逻辑,消耗CPU、内存,资源有限
        } finally {
            counter.decrease();
        }
    }
}

类似的限流在RedisSpringCloudNginx、微服务、网关中都有,其最核心技术一定是计数器。

三、并发控制策略

并发导致了上述问题,我们需要控制并发,以达到我们想要的效果。并发控制有两种策略:悲观策略(Pessimistic concurrency control)乐观策略(Optimistic concurrency control)。这是两种解决并发问题的思路。

【深入浅出多线程】无锁编程

悲观策略是一种悲观的思想,即认为写较多,遇到并发的可能性高,每次拿数据时,都会认为别人会修改数据,所以同一时间只允许一个线程执行。

【深入浅出多线程】无锁编程

乐观策略是一种乐观的思想,即认为读多写少,遇到并发的可能性低,每次拿数据时都认为别人不会修改,所以不会上锁。但是在更新的时候会判断判断值有没有发生变化,如果发生变化了,意味着其他线程已经更新,则此次计算更新失效,需要重新取值计算。

适用场景:

  1. 重试成本小;
  2. 并发数量小;

无锁编程要先了解有锁编程的问题。

四、锁带来的问题

按照悲观策略的思想,我们当然可以通过加锁的方式解决并发不一致问题,计数器例子的加锁版本代码如下:

class SynchronizedCounter {
    private int i = 0;

    public synchronized void increment() {
        i++;
    }

    public synchronized void decrement() {
        i--;
    }

    public synchronized int value() {
        return i;
    }
}

但是加锁存在一些问题:

【深入浅出多线程】无锁编程

我们知道,线程的数量一定的远远大于CPU数量的,如上图所示,CPU只有3个,而线程数则可能成百上千。如何把这么多的线程分配到有限的CPU上,这就是操作系统调度程序做的事情。JVM进程中的线程也是由操作系统中的调度器负责将其调度到指定的CPU上执行。

再来分析上面加锁版本的计数器。线程-1和线程-2都在抢锁,假设线程-1抢到了锁,线程2没有抢到锁,线程2变为阻塞状态。那么,下一次线程-2什么时候?可能会在1ms之后,也可能10ms之后执行。也就是说,由于操作系统调度存在,所以这是不确定的。线程-1执行完毕。

假设执行一次i++需要0.05ms,则可能会发生以下场景:

? 第一次i++ 0.05ms

? 线程切换 --- 10ms

? 第二次i++ 0.05ms

这就是说,每发生一次枪锁失败导致的线程切换,都会增加一定的线程切换时间、等待时间,从而导致请求的处理时间变长,降低了系统的吞吐量。

五、无锁编程实现

通过分析计数器例子中计数器累加操作i++,我们发现:

  1. 读取值:没有问题——不会破坏数据的状态;
  2. 计算值:没有问题——不会破坏数据的状态;
  3. 赋值:会修改内存数据,导致数据结果不符合程序的原子性要求——会破坏数据的状态,即:存在并发安全问题

按照乐观并发控制策略,假设不存在多线程竞争,我们不进行加锁操作,那么第1步和第2步都没有问题。第3步,写结果到内存时,需要判断在第1步和第2步中,刚才的假设是否成立,如果成立,则直接将结果写入内存,否则刚才计算结果不正确,需要重新计算。也就是说,最后一步是无锁编程的核心,如何确保判断并根据结果写回内存这一操作具备原子性?

这需要硬件的支持。

最后一步判断并根据结果写回内存本质是操作内存中一个值,而内存正好提供了这么一种机制——CAS(Compare and swap)机制,属于硬件同步原语。该方法需要三个参数:内存地址、当前值A、新值B。

CAS的具体操作:先比较输入的当前值和内存中的值是否一致,不一致则代表内存数据发生了变化,CAS失败。如果输入的旧值A和内存中的值一致,则将值A替换为值B,CAS操作成功。

【深入浅出多线程】无锁编程

Java提供了一种直接操作内存的工具类sun.misc.Unsafe,顾名思义,这个类对于普通程序员来说是”危险“的,一般应用开发者不会用到这个类。在Unsafe类中就有一系列的compareAndSwapXXX()方法,例如,本文要用到的compareAndSwapInt函数:

@ForceInline
public final boolean compareAndSwapInt(Object o, long offset,
                                       int expected,
                                       int x) {
    return theInternalUnsafe.compareAndSetInt(o, offset, expected, x);
}

这个函数的四个参数分别代表要操作的对象、对象属性相对该对象的偏移量、期望内存中的值以及计算后的值。参数1、3、4都很容易获取,第2个成员变量相对该对象的偏移量怎么获取?我们也可以通过这个工具类获取:

@ForceInline
public long objectFieldOffset(Field f) {
    return theInternalUnsafe.objectFieldOffset(f);
}

最后我们要调用这些方法,我们首先要获取Unsafe的实例变量。然而,这个类的构造函数时私有的,应用程序也无法通过其静态成员方法getUnsafe()获取其实例。那么,怎么获取Unsafe的实例呢?这就要使用强大的反射机制了(反射就像一面镜子,我们往JVM中放了什么东西,我们就可以从JVM里获取到这些东西,也许这是反射名字的由来吧!)。

public static Unsafe getUnsafe() throws IllegalAccessException {
    Field unsafeField = Unsafe.class.getDeclaredFields()[0];
    unsafeField.setAccessible(true);
    return (Unsafe) unsafeField.get(null);
}

最后,基于CAS机制的Counter实现如下:

public class CASCounter {
    private int i = 0;

    private static Unsafe unsafe;	// Unsafe实例
    private static long offset;		// i相对CASCounter对象实例的偏移

    static {
        try {
            // 通过反射机制获取Unsafe实例
            var unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            unsafe = (Unsafe) unsafeField.get(null);

            // 获取i相对CASCounter对象实例的偏移
            Field fc = CASCounter.class.getDeclaredField("i");
            offset = unsafe.objectFieldOffset(fc);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public void increment() {
        while (true) {
            int currentValue = i;
            int newValue = currentValue + 1;
            // 原子方式的比较写回操作,如果成功则返回,否则重新获取并计算(重试)
            if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {
                return;
            }
        }
    }

    public void decrement() {
        while (true) {
            int currentValue = i;
            int newValue = currentValue - 1;
            // 原子方式的比较写回操作,如果成功则返回,否则重新获取并计算(重试)
            if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {
                return;
            }
        }
    }

    public int value() {
        return i;
    }
}

六、性能比较

我们在启动器开启的线程中加入计时逻辑,如下:

for (int i = 0; i < 2; i++) {
    new Thread(() -> {
        var begin = System.nanoTime();
        for (int j = 0; j < 10000; j++) {
            counter.increment();
        }
        System.out.println(System.nanoTime() - begin);
    }).start();
}

运行结果如下:

【深入浅出多线程】无锁编程

使用SynchronizeCounter的运行结果

【深入浅出多线程】无锁编程

使用CASCounter的运行结果

可以看到两者性能差距在10倍以上!

【深入浅出多线程】无锁编程

上一篇:GIN转换UTC时间


下一篇:低代码在这些方面充满优势