Java本地线程变量ThreadLocal的神秘面纱

这玩意应该没有没用过的人了吧!

ThreadLocal

作用:线程隔离

1.提供的方法

  • get()
  • set()
  • remove()

2.java中使用

static ThreadLocal<ReqData> threadLocal = new ThreadLocal<ReqData>(){
    @Override
    protected ReqData initialValue() {
        //重写initialValue方法 返回当前线程存储的变量对象
        return new ReqData();
    }
};
//java8 语法
static ThreadLocal<ReqData> threadLocal = ThreadLocal.withInitial(() -> new ReqData());

public static void main(String[] args) {
    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            threadLocal.get().setName(Thread.currentThread().getName());
            threadLocal.get().setReqId(UUID.randomUUID().toString());
            System.out.println(threadLocal.get().toString());
        }).start();
    }
}

@Data
static class ReqData {
    String name;
    String reqId;
}
//输出
ThreadLocalDemo.ReqData(name=Thread-2, reqId=df3e550f-48dd-49fa-99b5-8889058cfad1)
ThreadLocalDemo.ReqData(name=Thread-0, reqId=b81a34e7-3507-445c-81ca-bea07b6e4626)
ThreadLocalDemo.ReqData(name=Thread-1, reqId=6aaa376c-1379-4cbf-a9ad-d509ff4aec9b)

3.源码分析

在看源码之前先了解线性探测是个什么东西?

线性探测是计算机程序解决散列表冲突时所采取的一种策略
为了搜索给定的键 x,散列表中由h(x)对应的单元开始的相邻单元h(x) + 1,h(x) + 2, …, 都将被检查,直到找到了内容为空的单元或是找到了存储给定键为x的单元。其中,h是散列函数。如果找到了存储给定键的单元,搜索将会返回单元中存储的键对应的值。

简单来说就是在插入数组下标X的时候,当下标X的key值与插入的key值一致时,可能会直接覆盖,当不一致时,就从相邻的下标X+1、X+2 ...都将会被检测,直到找到内容为空或者key一致时才会在当前下标的空间存入

  • 底层数据结构,每个线程都有独立的Entry空间
  • key:Threadlocal的引用值的hashcode,如果出现hash碰撞,需要线性计算出新的位置并移除脏数据
  • vaule:对象
  • 扩容:生成一个新的数组,把原有数组拷贝过去

3.1 initialValue || withInitial 初始化

static ThreadLocal<ReqData> threadLocal = new ThreadLocal<ReqData>(){
    @Override
    protected ReqData initialValue() { //重写了initialValue方法 返回线程变量对象
        return new ReqData();
    }
};

3.2 初始化以及结构

ThreadLocal.ThreadLocalMap

//WeakReference 弱引用 用于释放线程
static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    //我们自定义的存储对象
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}
//核心结构 初始size16的数组
private Entry[] table;

3.3 set()方法

public void set(T value) {
    Thread t = Thread.currentThread();
    //每个线程里面都有一个ThreadLocal.ThreadLocalMap threadLocals;
	//获取到当前线程的ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
    	//懒加载形式 后面有介绍
    	//t.threadLocals = new ThreadLocalMap(this, firstValue);
        createMap(t, value);
}

private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    //通过hash计算出数组下标
    int i = key.threadLocalHashCode & (len-1);
    //这里采用的主要是线性探索 找到对应位置的Entry不为空才进for循环
    //1.Entry中的Key和当前传入的key相同 则直接覆盖
    //2.如果Entry中的key为空(ThreadLocal弱引用) 需要清理脏Entry 同时也会进行赋值处理
    //没有return的则继续往下一个下标查找 
    //nextIndex -> ((i + 1 < len) ? i + 1 : 0); //往下标位置每次加1
    //prevIndex -> ((i - 1 >= 0) ? i - 1 : len - 1); //往下标位置每次减1
    for (Entry e = tab[i];
         e != null;
         //线性探索 i会被赋值
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
		//当前位置不为空 key值和当前线程key相等 那就直接覆盖
        if (k == key) {
            e.value = value;
            return;
        }
		//如果为空 则从当前下标进行清理 并赋值
        //replaceStaleEntry -> 先从i数组位置往前清理 然后再往后清理脏数据 再把当前key value存入
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }
	//数组下标位置没被占的则直接赋值
    tab[i] = new Entry(key, value);
    int sz = ++size;
    //先走一遍当前下标到数组size+1的脏数据清理 会返回是否有清除元素
    //如果没有可清除的元素,说明从当前节点到数组长度都是存在值的 则考虑是否扩容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

3.4 get()方法

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        //从Entry数组中获取Entry
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    //map 为空 还没初始化
    return setInitialValue();
}
  • getEntry()
private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    //能直接找到的话 直接return
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

//线性往下查找 当前位置的Entry 与当前key不相等 则继续往下查找 不懂的看set线性添加
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    while (e != null) {
        ThreadLocal<?> k = e.get();
        if (k == key)
            return e;
        if (k == null)
        	//从当前下标位置往后进行脏数据清理
            expungeStaleEntry(i);
        else
        	//否则就下标递增
            i = nextIndex(i, len);
        //经过上面的清理 如果为空了就跳出循环    
        e = tab[i];
    }
    return null;
}
  • setInitialValue 初始化 ThreadLocal
private T setInitialValue() {
    //调用到我们自己实现的代码 返回一个初始化对象
    T value = initialValue();
    Thread t = Thread.currentThread();
    //由于当前是不是原子操作 再获取一次
    ThreadLocalMap map = getMap(t);
    if (map != null)
        //已经有别的线程创建了 则直接set
        map.set(this, value);
    else
        //否则就创建ThreadLocalMap 传入线程和用于存储的对象
        createMap(t, value);
    return value;
}
  • createMap 初始化ThreadLocalMap方法
//get和set都是懒加载创建的
void createMap(Thread t, T firstValue) {
    //给线程的变量ThreadLocalMap赋值 this->当前线程
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
//初始化线程中ThreadLocalMap的Entry数组 保存存储的对象
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    //初始化Entry数组
    table = new Entry[INITIAL_CAPACITY];
    //根据当前线程的引用->hashcode 计算出数组下标
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    //保存存储的对象
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    //设置扩容因子 len * 2 / 3 大小16->达到10扩容
    //这里的扩容就是生成新的entry数组 然后把旧的值重新计算迁移过去
    setThreshold(INITIAL_CAPACITY);
}

3.5 remove()方法

//移除ThreadLocal变量
public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    //还是线性探索去清除
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        //遍历数组 引用key相等 就直接清除
        if (e.get() == key) {
            e.clear();
           	//根据当前下标往后清理
            expungeStaleEntry(i);
            return;
        }
    }
}

4.ThreadLocal总结

源码分析到这里就结束了,我是根据自己的熟悉程度来写的,写的不是很详细,重点还是自己多看多断点

expungeStaleEntry(int staleSlot); 从staleSlot下标往后清理脏Entry
cleanSomeSlots(int i, int n); 从i下标往后清理脏Entry n->循环截止的次数
replaceStaleEntr(i) 从数组位置i的前后清理脏Entry

在threadLocal的生命周期里,针对threadLocal存在的内存泄漏的问题,都会通过expungeStaleEntry,cleanSomeSlots,replaceStaleEntry这三个方法清理掉key为null的脏entry.

5.强引用 & 弱引用(WeakReference)- 为什么会用弱引用?

从上面的分析,我们得知每个线程都有自己的ThreadLocalMap , 里面维护着一个Entry对象,key是弱引用指向线程(线程执行完毕,就会被回收,key就没有引用了),而 Object value = v是强引用指向我们的存储对象实例,也就是没有remove时会有一种情况:

线程执行完毕了,线程声明周期结束,线程就会被GC回收,那么Entry中的key引用会为null,但是value此时还是存在的

按照代码分析,每次进行set、get、remove都会去做Entry脏数据的移除,那为什么不remove还是会出现内存泄漏问题?

经过多方资料查询,如果一个线程执行链路长且在最后没有终止的过程中,value也没有继续使用了,那么value 的强引用就一直还在,就会有可能导致内存泄漏,所以使用完最好都调用remove方法防止内存泄漏

  • Java中弱引用示列
@Data
static class ReqData {
    String name;
    String reqId;
}   
static class WeakReferenceBean extends WeakReference<ReqData> {
    public WeakReferenceBean(ReqData referent) {
        super(referent);
    }
}
public static void main(String[] args) throws InterruptedException {
    ReqData reqData = new ReqData();
    reqData.setReqId("124578");
    reqData.setName("弱引用");
    WeakReferenceBean bean = new WeakReferenceBean(reqData);
    System.out.println("before:" + bean.get()); //->ReqData(name=弱引用, reqId=124578)
    //实例对象置为空 然后调用GC
    reqData = null;
    System.gc();
    //保证已经被回收 虚拟机加上参数-XX:+PrintGCDetails 可以看到回收日志
    Thread.sleep(2000);
    System.out.println("after:" + bean.get());//->null
}

以上就是本章的全部内容了。

上一篇:J.U.C中的工具类及原理分析(CountDownLatch、Semaphore、CyclicBrrier)
下一篇:一文带你熟透Java线程池的使用及源码

路漫漫其修远兮,吾将上下而求索

上一篇:726. 原子的数量


下一篇:数据结构—完全二叉堆