业务背景
前段时间在工作项目中,被指派一项优化任务,由于原功能耗时较久,影响使用体感;梳理完该功能涉及到的业务后,发现具备复杂、量大等特点。
将一个大体量数据循环分割成若干小循环,并行执行,结果汇总,是该优化方案中的一环。但原业务中使用ThreadLocal。
ThreadLocal是解决线程间数据隔离性的,源于它将值存储在每个线程Thread的自己变量(ThreadLocal.ThreadLocalMap)中,但是ThreadLocal有个弊端,由于线程间隔离性,子线程无法捕获父线程ThreadLocalMap里的值。一旦使用多线程,需要解决值传递问题。
尝试使用InheritableThreadLocal(Itl)解决问题
Thread线程有两个ThreadLocal.ThreadLocalMap 类型变量threadLocals 和 inheritableThreadLocals。
ThreadLocal.ThreadLocalMap threadLocals = null; ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
inheritableThreadLocals 是在Thread 初始化方法init中赋值。
if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); // 创建一个新ThreadLocalMap 并将父线程值传递过去
InheritableThreadLocal是可以解决子父线程值传递问题。我使用Itl 代替了Tl。待优化工作完毕,本地调试及测试通过后。发布到测试环境验证中,在测试环境中发现功能出现了异常,重新回归到本地测试,本地功能多次反复点测却又是完好的。我一度认为环境问题导致,却毫无根据。开始输出日志定位排查,发现测试环境部分线程没有获取到主线程set的值。
开始思考其中原因。想到了线程池问题,项目中使用了线程池,点测该功能前,线程池是有部分线程存于活跃状态,该部分线程没有经过创建时值传递过程,在执行该功能时,获取不到父线程set的值。
本地环境却是好的,这也不奇怪了。在我本地程序重启后,直接点测了该功能,线程池中的线程都是在该任务中创建的,全部携带了父线程set的值。
使用TransmittableThreadLocal解决线程池引起的问题
查阅了资源,决定使用Alibaba开源的TransmittableThreadLocal,根据官网描述:TransmittableThreadLocal(简称TTL),"在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题"。在使用TransmittableThreadLocal后,测试环境线程池引发的问题确实得到了解决。
官网地址:https://github.com/alibaba/transmittable-thread-local
知其然知其所以然
Ttl定义:
TransmittableThreadLocal<T> extends InheritableThreadLocal<T> //集成了Itl,具备Itl的功能.子线程被创建后,继承了父线程的值
Ttl有个类变量hold:
/** * hold是一个InheritableThreadLocal类型变量,持有WeakHashMap,重写了initialValue()和childValue()方法, * 异步时父线程的属性是直接作为初始化值赋值给子线程的本地变量对象 */ private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() { protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); } protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); } };
该hold变量数据来源:
public final T get() { T value = super.get(); if (disableIgnoreNullValueSemantics || null != value) addThisToHolder(); //往hold变量添加值 return value; } public final void set(T value) { if (!disableIgnoreNullValueSemantics && null == value) { // 如果值为空 就移除掉 remove(); } else { super.set(value); addThisToHolder();//往hold变量添加值 } } "unchecked") ( private void addThisToHolder() { if (!holder.get().containsKey(this)) { //开始往hold 变量中存值 holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap 支撑null值 //这里打印一句话 方面梳理整个工作流程 System.out.println(String.format("%s线程已成功在hold变量中存储值。key:%s,value=%s",Thread.currentThread().getName(),this.hashCode(),this.get())); } }
以上代码做个总结:Ttl有一个InheritableThreadLocal类型hold变量,通过线程的set,get操作触发addThisToHolder()方法进行添加值。
另外需要注意的是:Ttl不是直接执行Runnable的run方法,而是通过TtlRunnable对run方法做了一层封装。
先看下Runnable 构造方法:
private TtlRunnable( Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { this.capturedRef = new AtomicReference<Object>(capture()); //会触发 capture()方法 this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; }
在TtlRunnable构造方法中,会触发capture(),将数据(主线程set的值)储存在capturedRef变量中。
接着看下capture() 方法:
public static Object capture() { //从父线程捕获的值形成快照对象 return new Snapshot(captureTtlValues(), captureThreadLocalValues()); } private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() { HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<TransmittableThreadLocal<Object>, Object>(); //这里打印一下语句 方便掌握整个流程 System.out.println(String.format("----------------------------------------------------------------------------")); System.out.println(String.format("capture()开始从hold变量捕获数据")); int i=0; /** * 这里遍历hold,取出父线程set进去的值,存储到HashMap类型的ttl2Value变量中 */ for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); i++; System.out.println(String.format("从hold变量中拿到的数据%s:key=%s, value=%S",i,threadLocal.hashCode(),threadLocal.get())); } return ttl2Value; }
以上代码做个总结:TtlRunnable在构建时,触发capture()方法,将父线程存储到hold变量中的值形成快照(Snapshot)存档下来。
接着看TtlRunnable的run方法:
public void run() { final Object captured = capturedRef.get(); //capture()方法返回的值 if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } /** * replay方法比较关键, 做了几件事 * 1、遍历hold变量,然后将数据备份到map类型的backup变量中 * 2、判断hold变量中是否存在captured变量中不存在的元素,如有,将从hold移除 * 3、为当前子线程赋值 * backup用于恢复数据,任务执行完毕,需要回归线程池,需要恢复其原本变量值 */ final Object backup = replay(captured); try { runnable.run(); //此时子线程已有值,可以执行业务 } finally { /** * 1、根据backup备份数据,将hold数据复原 * 2、根据backup备份数据,将子线程重新恢复原有数值 */ restore(backup); } }
TtlRunnable的run方法中, replay()和restore()方法比较关键。
先看 replay()方法:
public static Object replay( Object captured) { final Snapshot capturedSnapshot = (Snapshot) captured; return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues( HashMap<TransmittableThreadLocal<Object>, Object> captured) { HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap<TransmittableThreadLocal<Object>, Object>(); //以下打印语句 方面梳理整个工作流程 System.out.println("--------------------------------------------------------------"); System.out.println(String.format("replayTtlValues开始,%s线程从holder变量里的获取数据列表",Thread.currentThread().getName())); //此循环是我手动写的 源码没有 int i=0; for (Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext();){ TransmittableThreadLocal<Object> threadLocal= iterator.next(); i++; System.out.println(String.format("%s线程从hold拿到的数据%s:key=%s, value=%s",Thread.currentThread().getName(), i,threadLocal.hashCode(), threadLocal.get())); } for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); backup.put(threadLocal, threadLocal.get()); /** *从hold变量清理点captured变量中不存在的值,避免在执行业务(run方法)时存在数据污染 * * 注意:captured存储的变量和此时hold存储的变量有可能不一致 * 原因:captured 变量存储的是父线程放进去的值。在TtlRunnable构建方法中触发capture() 存入到captured的值 * 而此时从hold.get()获取的值,是子线程被创建时从父线获取的。触发了Thread init初始化方法。 * * 为什么要执行该清理操作? * 这里需要举例子说明: 如果子线程在被创建是已经从父现在那里获取到了值 value=3,该子线程在线程池中如果没有消灭,在某个主线程中再次 * 执行任务时,假如该主线程 没有设置value=3这个值,此时captured变量中就不会有value=3 那么该子线程执行run方法时 就会取到vlue=3这个值,污染了业务数据 * */ if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } /** * 这里将captured变量中的值 set到本线程中,进而触发了addThisToHolder()方法, * 该方法执行后,hold变量有可能会发生变化 */ setTtlValuesTo(captured); // 用于功能拓展 doExecuteCallback(true); //以下打印语句 方面梳理整个工作流程 System.out.println("--------------------------------------------------------------"); System.out.println(String.format("replayTtlValues结束,%s线程从holder变量里的获取数据列表",Thread.currentThread().getName())); i=0; for (Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext();){ TransmittableThreadLocal<Object> threadLocal= iterator.next(); i++; System.out.println(String.format("%s线程从hold拿到的数据%s:key=%s, value=%s",Thread.currentThread().getName(), i,threadLocal.hashCode(), threadLocal.get())); } return backup; }
接着看restore()方法:
public static void restore( Object backup) { final Snapshot backupSnapshot = (Snapshot) backup; restoreTtlValues(backupSnapshot.ttl2Value); restoreThreadLocalValues(backupSnapshot.threadLocal2Value); } private static void restoreTtlValues( HashMap<TransmittableThreadLocal<Object>, Object> backup) { // call afterExecute callback doExecuteCallback(false); //以下打印语句 方面梳理整个工作流程 在源代码中不存在 System.out.println("--------------------------------------------------------------"); System.out.println("restoreTtlValues开始,从holder变量里的获取数据列表"); int i=0; for (Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext();){ TransmittableThreadLocal<Object> threadLocal= iterator.next(); i++; System.out.println(String.format("从hold拿到的数据%s:key=%s, value=%s", i,threadLocal.hashCode(), threadLocal.get())); } /** * 对holder再次遍历,然后做数据清洗工作 * backup 也就是replay方法中对子线程原本持有的数据备份,而此时的hold变量,经过replay方法后, * 已经包括了某些主线程set进去的值 * 所有需要借助backup 清理hold变量,使之恢复到子线程最之初,也就是被创建时携带的数据 */ for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); if (!backup.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } //以下打印语句 方面梳理整个工作流程 在源代码中不存在 System.out.println("--------------------------------------------------------------"); System.out.println("restoreTtlValues结束,从holder变量里的获取数据列表"); i=0; for (Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext();){ TransmittableThreadLocal<Object> threadLocal= iterator.next(); i++; System.out.println(String.format("从hold拿到的数据%s:key=%s, value=%s", i,threadLocal.hashCode(), threadLocal.get())); } /** * 1、根据backup备份数据,将hold数据复原 * 2、根据backup备份数据,将子线程重新恢复原有数值 */ setTtlValuesTo(backup); }
以上代码做个总结:在TtlRunnable的run方法中,先拿到captured快照数据,调用replay()方法,通过对hold变量遍历,对子线程原本数据对备份(backup),然后对比快照数据,清洗到hold变量中快照数据不存在的值。执行Runnable中的run方法,最后调用restore()方法,进行对hold变量和子线程持有数据进行恢复。
测试工作流程
核心代码已经结束了。为了更直观查看流程,从官网clone下来源代码,进行测试。
测试方法:
public class TtlControllerTest { private static ExecutorService executorService = Executors.newFixedThreadPool(1); public static void main(String[] args) throws InterruptedException { final ThreadLocal tl1 = new TransmittableThreadLocal(); final ThreadLocal tl2 = new TransmittableThreadLocal(); tl1.set(1); executorService.execute(TtlRunnable.get(new Runnable() { public void run() { try { Thread.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); } } })); new Thread() tl2.set(3); Thread.sleep(1000L); //这块停顿一下 executorService.execute(TtlRunnable.get(new Runnable() { public void run() { try { Thread.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); } } })); } }
查看流程结果
main线程已成功在hold变量中存储值。key:1282788025,value=1 ---------------------------------------------------------------------------- capture()开始从hold变量捕获数据 从hold变量中拿到的数据1:key=1282788025, value=1 main线程已成功在hold变量中存储值。key:2101440631,value=3 -------------------------------------------------------------- replayTtlValues开始,pool-1-thread-1线程从holder变量里的获取数据列表 pool-1-thread-1线程从hold拿到的数据1:key=1282788025, value=1 -------------------------------------------------------------- replayTtlValues结束,pool-1-thread-1线程从holder变量里的获取数据列表 pool-1-thread-1线程从hold拿到的数据1:key=1282788025, value=1 -------------------------------------------------------------- restoreTtlValues开始,从holder变量里的获取数据列表 从hold拿到的数据1:key=1282788025, value=1 -------------------------------------------------------------- restoreTtlValues结束,从holder变量里的获取数据列表 从hold拿到的数据1:key=1282788025, value=1 ---------------------------------------------------------------------------- capture()开始从hold变量捕获数据 从hold变量中拿到的数据1:key=2101440631, value=3 从hold变量中拿到的数据2:key=1282788025, value=1 -------------------------------------------------------------- replayTtlValues开始,pool-1-thread-1线程从holder变量里的获取数据列表 pool-1-thread-1线程从hold拿到的数据1:key=1282788025, value=1 pool-1-thread-1线程已成功在hold变量中存储值。key:2101440631,value=3 -------------------------------------------------------------- replayTtlValues结束,pool-1-thread-1线程从holder变量里的获取数据列表 pool-1-thread-1线程从hold拿到的数据1:key=2101440631, value=3 pool-1-thread-1线程从hold拿到的数据2:key=1282788025, value=1 -------------------------------------------------------------- restoreTtlValues开始,从holder变量里的获取数据列表 从hold拿到的数据1:key=2101440631, value=3 从hold拿到的数据2:key=1282788025, value=1 -------------------------------------------------------------- restoreTtlValues结束,从holder变量里的获取数据列表 从hold拿到的数据1:key=1282788025, value=1