动手实践路之Netty的FastThreadLocal篇
本系列文章着重点在于实践,通过简短的代码剖析其内部工作原理,抓住其主要工作流程,再利用动手实践这一环节来真正掌握这门技术。
开胃菜
介绍
? 首先,我们先看下这段注释
/**
* A special variant of {@link ThreadLocal} that yields higher access performance when accessed from a
* {@link FastThreadLocalThread}.
*/
大致说的是:FastThreadLocal是ThreadLocal的一个变体,假如和FastThreadLocalThread搭配使用则可以产生更高的访问性能。
? 我们先看个对照表
JDK | Netty |
---|---|
Thread | FastThreadLocalThread |
ThreadLocal | FastThreadLocal |
ThreadLocalMap | InternalThreadLocalMap |
那Netty作为高性能的网络通信框架,为什么不优先选择JDK的ThreadLocal而是选择自己构造一个FastThreadLocal,那Fast的表现在哪里呢? 现在我们再看一段注释
/* a {@link FastThreadLocal} uses a constant index in an array, instead of using hash code and hash table,
* to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash
* table, and it is useful when accessed frequently.
*/
原来,FastThreadLocal的fast表现在其底层巧妙的设计。在Netty中,每创建一个FastThreadLocal对象就会为它分配一个不重复且唯一的常量(a constant index),它作为数组的下标,在插入和查找都是O(1)的操作;反观,JDK的ThreadLocal 底层采用的HashCode和HashTable,必然就会有Hash冲突,而ThreadLocal在解决hash冲突时的做法就是采用线性探测法来解决冲突,其性能固然下降。
正餐
分析
? 可以确定的方向是FastThreadLocal的底层采用数组的数据结构的方式,现在我们来看看Netty中FastThreadLocalThread、FastThreadLocal、InternalThreadLocalMap三者之间的关系。
该图描述三者之间的关系,简单点说三个线程都运行则每个线程都有自己的InternalThreadLocalMap,set和get时根据ftl的index确定其位置来进行操作。
具体实践
1、InternalThreadLocalMap
属性
// 唯一ID ,线程安全
private static final AtomicInteger nextIndex = new AtomicInteger();
// 存储数据
private Object[] indexedVariables;
// 数组长度
private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;
// 默认存储值
public static final Object UNSET = new Object();
方法
// 构造函数
private InternalThreadLocalMap() {
// 初始化数组
indexedVariables = newIndexedVariabledTable();
}
private static Object[] newIndexedVariabledTable() {
Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
Arrays.fill(array, UNSET);
return array;
}
//得到下标
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
return index;
}
// 插入
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
System.out.println(Thread.currentThread()+" "+(String)value);
return oldValue == UNSET;
} else {// 扩容处理
}
return true;
}
// 取值
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length ? lookup[index] : UNSET;
}
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
}
return null;
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
2、FastThreadLocal
属性
private final int index;// 下标
// FastThreadLocal
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
//InternalThreadLocalMap
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
return index;
}
FastThreadLocal的下标初始化过程是在构造函数中调用InternalThreadLocalMap的nextVariableIndex方法得到一个不会重复的整数。
set( V value)
// FastThreadLocal
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
threadLocalMap.setIndexedVariable(index, value);
}
}
|
|
//InternalThreadLocalMap
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
}
return null;
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
//InternalThreadLocalMap
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {// 扩容处理
}
return true;
}
set的大致流程:
- 通过InternalThreadLocalMap.get()得到当前线程的InternalThreadLocalMap。
- 再调用InternalThreadLocalMap.setIndexedVariable方法设置值(更具index确定位置再用新值替换旧值)
v get()
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
return initialize(threadLocalMap);
}
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
e.printStackTrace();
}
threadLocalMap.setIndexedVariable(index, v);
return v;
}
protected V initialValue() throws Exception {
return null;
}
get()大致流程
- 通过InternalThreadLocalMap.get()得到当前线程的InternalThreadLocalMap。
- 判断InternalThreadLocalMap是否创建
- 已创建,根据下标直接拿到值 否则 初始化创建并赋值。
3、FastThreadLocalThread
public class FastThreadLocalThread extends Thread {
private InternalThreadLocalMap threadLocalMap;
public final InternalThreadLocalMap threadLocalMap() {
return threadLocalMap;
}
public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}
public FastThreadLocalThread(Runnable target) {
super(target);
}
}
大致是进程Thread,添加InternalThreadLocalMap的属性。
测试用例
Things
public class Things {
private int ID;//唯一标识
private final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>() {
@Override
protected String initialValue() throws Exception {
return "FastThreadLocal:Things";
}
};
public Things(int id) {
this.ID = id;
}
public void say() {
System.out.println(Thread.currentThread() + " " + threadLocal.get());
}
public void set(String s) {
threadLocal.set(s + " " + ID);
}
}
ThingsRun
public class ThingsRun implements Runnable{
private Things things;
public ThingsRun(Things things){
this.things=things;
}
@Override
public void run() {
things.say();
things.set(System.currentTimeMillis()+" ");
things.say();
}
}
FastThreadLocalThreadTest
@Test
public void run() throws InterruptedException {
Things t1 = new Things(1);
Things t2 = new Things(1);
ThingsRun tr1 = new ThingsRun(t1);
ThingsRun tr2 = new ThingsRun(t2);
new FastThreadLocalThread(tr1).start();
new FastThreadLocalThread(tr2).start();
Thread.sleep(300000);
}
result
Thread[Thread-0,5,main] FastThreadLocal:Things
Thread[Thread-0,5,main] FastThreadLocal:Things
Thread[Thread-0,5,main] 1621350074996 1
Thread[Thread-0,5,main] 1621350074996 1
Thread[Thread-1,5,main] FastThreadLocal:Things
Thread[Thread-1,5,main] FastThreadLocal:Things
Thread[Thread-1,5,main] 1621350074996 1
Thread[Thread-1,5,main] 1621350074996 1
结语
本文的完整代码地址:https://github.com/ThreadNew/HandsOnCode/tree/master/HandsOn
本人典型的码农性格,不善表达;文中的表达有误请谅解,如有错误请指出谢谢!祝大家码到成功!。