浅谈Netty中的FastThreadLocal的优势和实现

目录

FastThreadLocal为什么比ThreadLocal快

ThreadLocal

ThreadLocalMap

线性探测法

FastThreadLocal

FastThreadLocalThread

InternalThreadLocalMap

 FastThreadLocal.set方法

FastThreadLocal.get方法 


 

基于FastThreadLocalThread去使用FastThreadLocal时的效率要高于JDK的Thread使用ThreadLocal。

FastThreadLocal为什么比ThreadLocal快

FastThreadLocal与ThreadLocal内部存储结构不同,FastThreadLocal基于数据的存储形式,相对ThreadLocal来说站空间多,但是却使得查找效率更高,而ThreadLocal基于hash表,当存在Hash冲突时基于线性表的查找效率显然不如数组索引查找。

ThreadLocal

首先来回顾一下ThreadLocal。在JDK中提供了ThreadLocal用于存储线程私有数据。为了避免加锁,实现上以 Thread 入手,在 Thread 中维护一个 Map,记录 ThreadLocal 与实例之间的映射关系,这样在同一个线程内,Map 就不需要加锁了。

ThreadLocalMap

ThreadLocal内部存储是基于ThreadLocalMap的,ThreadLocalMap 其实与 HashMap 的数据结构类似,是一种使用线性探测法实现的哈希表,底层采用数组存储数据。ThreadLocalMap 会初始化一个长度为 16 的 Entry 数组,每个 Entry 对象用于保存 key-value 键值对。与 HashMap 不同的是,Entry 的 key 就是 ThreadLocal 对象本身,value 就是用户具体需要存储的值。

当调用 ThreadLocal.set() 添加 Entry 对象时,是如何解决 Hash 冲突的呢?下面是答案

线性探测法

每个 ThreadLocal 在初始化时都会有一个 Hash 值为 threadLocalHashCode,每增加一个 ThreadLocal, Hash 值就会固定增加一个魔术 HASH_INCREMENT = 0x61c88647。为什么取 0x61c88647 这个魔数呢?实验证明,通过 0x61c88647 累加生成的 threadLocalHashCode 与 2 的幂取模,得到的结果可以较为均匀地分布在长度为 2 的幂大小的数组中。

ThreadLocal.get() 的过程也是类似的,也是根据 threadLocalHashCode 的值定位到数组下标,然后判断当前位置 Entry 对象与待查询 Entry 对象的 key 是否相同,如果不同,继续向下查找。由此可见,ThreadLocal.set()/get() 方法在数据密集时很容易出现 Hash 冲突,需要 O(n) 时间复杂度解决冲突问题,效率较低。

FastThreadLocal

FastThreadLocal 的实现与 ThreadLocal 非常类似,Netty 为 FastThreadLocal 量身打造了 FastThreadLocalThread 和 InternalThreadLocalMap 两个重要的类。

FastThreadLocalThread

FastThreadLocalThread 是对 Thread 类的一层包装,每个线程对应一个 InternalThreadLocalMap 实例。只有 FastThreadLocal 和 FastThreadLocalThread 组合使用时,才能发挥 FastThreadLocal 的性能优势。FastThreadLocalThread 主要扩展了 InternalThreadLocalMap 字段,我们可以猜测到 FastThreadLocalThread 主要使用 InternalThreadLocalMap 存储数据,而不再是使用 Thread 中的 ThreadLocalMap。

InternalThreadLocalMap

从 InternalThreadLocalMap 内部实现来看,与 ThreadLocalMap 一样都是采用数组的存储方式。但是 InternalThreadLocalMap 并没有使用线性探测法来解决 Hash 冲突,而是在 FastThreadLocal 初始化的时候分配一个数组索引 index,index 的值采用原子类 AtomicInteger 保证顺序递增,通过调用 InternalThreadLocalMap.nextVariableIndex() 方法获得。然后在读写数据的时候通过数组下标 index 直接定位到 FastThreadLocal 的位置,时间复杂度为 O(1)。如果数组下标递增到非常大,那么数组也会比较大,所以 FastThreadLocal 是通过空间换时间的思想提升读写性能。

浅谈Netty中的FastThreadLocal的优势和实现

 FastThreadLocal.set方法

public final void set(V value) {
    if (value != InternalThreadLocalMap.UNSET) { // 1. value 是否为缺省值
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 2. 获取当前线程的 InternalThreadLocalMap
        setKnownNotUnset(threadLocalMap, value); // 3. 将 InternalThreadLocalMap 中数据替换为新的 value
    } else {
        remove();

    }
}

FastThreadLocal.set() 方法虽然入口只有几行代码,但是内部逻辑是相当复杂的。我们首先还是抓住代码主干,一步步进行拆解分析。set() 的过程主要分为三步:

  1. 判断 value 是否为缺省值,如果等于缺省值,那么直接调用 remove() 方法。这里我们还不知道缺省值和 remove() 之间的联系是什么,我们暂且把 remove() 放在最后分析。

  2. 如果 value 不等于缺省值,接下来会获取当前线程的 InternalThreadLocalMap。

  3. 然后将 InternalThreadLocalMap 中对应数据替换为新的 value。 

首先我们看下 InternalThreadLocalMap.get() 方法.

  1. 如果当前线程是 FastThreadLocalThread 类型,那么直接通过 fastGet() 方法获取 FastThreadLocalThread 的 threadLocalMap 属性即可。如果此时 InternalThreadLocalMap 不存在,直接创建一个返回。
  2. 如果当前线程不是 FastThreadLocalThread,内部是没有 InternalThreadLocalMap 属性的,Netty 在 UnpaddedInternalThreadLocalMap 中保存了一个 JDK 原生的 ThreadLocal,ThreadLocal 中存放着 InternalThreadLocalMap,此时获取 InternalThreadLocalMap 就退化成 JDK 原生的 ThreadLocal 获取。
public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) { // 当前线程是否为 FastThreadLocalThread 类型
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}

private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 获取 FastThreadLocalThread 的 threadLocalMap 属性
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; 
    InternalThreadLocalMap ret = slowThreadLocalMap.get(); // 从 JDK 原生 ThreadLocal 中获取 InternalThreadLocalMap
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

 下面使用一幅图描述 InternalThreadLocalMap 的获取方式浅谈Netty中的FastThreadLocal的优势和实现

下面看下 setKnownNotUnset() 如何将数据添加到 InternalThreadLocalMap 的。

setKnownNotUnset() 主要做了两件事:

  1. 找到数组下标 index 位置,设置新的 value。

  2. 将 FastThreadLocal 对象保存到待清理的 Set 中。

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    // 1. 找到数组下标 index 位置,设置新的 value
    if (threadLocalMap.setIndexedVariable(index, value)) { 
    // 2. 将 FastThreadLocal 对象保存到待清理的 Set 中
        addToVariablesToRemove(threadLocalMap, this); 
    }
}

首先我们看下第一步 threadLocalMap.setIndexedVariable() 的源码 

public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        Object oldValue = lookup[index]; 
        lookup[index] = value; // 直接将数组 index 位置设置为 value,时间复杂度为 O(1)
        return oldValue == UNSET;
    } else {
        expandIndexedVariableTableAndSet(index, value); // 容量不够,先扩容再设置值
        return true;
    }
}

indexedVariables 就是 InternalThreadLocalMap 中用于存放数据的数组,如果数组容量大于 FastThreadLocal 的 index 索引,那么直接找到数组下标 index 位置将新 value 设置进去,事件复杂度为 O(1)。在设置新的 value 之前,会将之前 index 位置的元素取出,如果旧的元素还是 UNSET 缺省对象,那么返回成功。

如果数组容量不够了怎么办呢?InternalThreadLocalMap 会自动扩容,然后再设置 value。接下来看看 expandIndexedVariableTableAndSet() 的扩容逻辑.

如果看过HashMap 中扩容的源码I就会发现,nternalThreadLocalMap 实现数组扩容几乎和 HashMap 完全是一模一样的 。InternalThreadLocalMap 以 index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省对象 UNSET,最终把新数组赋值给 indexedVariables。为什么 InternalThreadLocalMap 以 index 为基准进行扩容,而不是原数组长度呢?假设现在初始化了 70 个 FastThreadLocal,但是这些 FastThreadLocal 从来没有调用过 set() 方法,此时数组还是默认长度 32。当第 index = 70 的 FastThreadLocal 调用 set() 方法时,如果按原数组容量 32 进行扩容 2 倍后,还是无法填充 index = 70 的数据。所以使用 index 为基准进行扩容可以解决这个问题,但是如果 FastThreadLocal 特别多,数组的长度也是非常大的。

private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = indexedVariables;
    final int oldCapacity = oldArray.length;
    int newCapacity = index;
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;
    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    newArray[index] = value;
    indexedVariables = newArray;
}

向 InternalThreadLocalMap 添加完数据之后,接下就是将 FastThreadLocal 对象保存到待清理的 Set 中。我们继续看下 addToVariablesToRemove() 是如何实现的。 

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    // 获取数组下标为 0 的元素
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); 
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
    // 创建 FastThreadLocal 类型的 Set 集合
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); 
    // 将 Set 集合填充到数组下标 0 的位置
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); 
    } else {
    // 如果不是 UNSET,Set 集合已存在,直接强转获得 Set 集合
        variablesToRemove = (Set<FastThreadLocal<?>>) v; 
    }
    // 将 FastThreadLocal 添加到 Set 集合中
    variablesToRemove.add(variable); 
}

variablesToRemoveIndex 是采用 static final 修饰的变量,在 FastThreadLocal 初始化时 variablesToRemoveIndex 被赋值为 0。InternalThreadLocalMap 首先会找到数组下标为 0 的元素,如果该元素是缺省对象 UNSET 或者不存在,那么会创建一个 FastThreadLocal 类型的 Set 集合,然后把 Set 集合填充到数组下标 0 的位置。如果数组第一个元素不是缺省对象 UNSET,说明 Set 集合已经被填充,直接强转获得 Set 集合即可。这就解释了 InternalThreadLocalMap 的 value 数据为什么是从下标为 1 的位置开始存储了,因为 0 的位置已经被 Set 集合占用了。 

FastThreadLocal.get方法 

public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    // 从数组中取出 index 位置的元素
    Object v = threadLocalMap.indexedVariable(index); 
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
    // 如果获取到的数组元素是缺省对象,执行初始化操作
    return initialize(threadLocalMap); 
}

public Object indexedVariable(int index) {
    Object[] lookup = indexedVariables;
    return index < lookup.length? lookup[index] : UNSET;
}

private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        v = initialValue();
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
    threadLocalMap.setIndexedVariable(index, v);
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}

首先根据当前线程是否是 FastThreadLocalThread 类型找到 InternalThreadLocalMap,然后取出从数组下标 index 的元素,如果 index 位置的元素不是缺省对象 UNSET,说明该位置已经填充过数据,直接取出返回即可。如果 index 位置的元素是缺省对象 UNSET,那么需要执行初始化操作。可以看到,initialize() 方法会调用用户重写的 initialValue 方法构造需要存储的对象数据,如下所示。

private final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>() {
    @Override
    protected String initialValue() {
        return "hello world";
    }
};

构造完用户对象数据之后,接下来就会将它填充到数组 index 的位置,然后再把当前 FastThreadLocal 对象保存到待清理的 Set 中。整个过程我们在分析 FastThreadLocal.set() 时都已经介绍过,就不再赘述了。 

上一篇:java 从零开始手写 RPC (07)-timeout 超时处理


下一篇:Netty框架编解码之MessagePack框架应用