动手实践路之Netty的FastThreadLocal篇

动手实践路之Netty的FastThreadLocal篇

本系列文章着重点在于实践,通过简短的代码剖析其内部工作原理,抓住其主要工作流程,再利用动手实践这一环节来真正掌握这门技术。

开胃菜

介绍

? 首先,我们先看下这段注释

/**
 * A special variant of {@link ThreadLocal} that yields higher access performance when accessed from a
 * {@link FastThreadLocalThread}.
 */

大致说的是:FastThreadLocal是ThreadLocal的一个变体,假如和FastThreadLocalThread搭配使用则可以产生更高的访问性能。

? 我们先看个对照表

JDK Netty
Thread FastThreadLocalThread
ThreadLocal FastThreadLocal
ThreadLocalMap InternalThreadLocalMap

那Netty作为高性能的网络通信框架,为什么不优先选择JDK的ThreadLocal而是选择自己构造一个FastThreadLocal,那Fast的表现在哪里呢? 现在我们再看一段注释

/* a {@link FastThreadLocal} uses a constant index in an array, instead of using hash code and hash table,
 * to look for a variable.  Although seemingly very subtle, it yields slight performance advantage over using a hash
 * table, and it is useful when accessed frequently.
 */

原来,FastThreadLocal的fast表现在其底层巧妙的设计。在Netty中,每创建一个FastThreadLocal对象就会为它分配一个不重复且唯一的常量(a constant index),它作为数组的下标,在插入和查找都是O(1)的操作;反观,JDK的ThreadLocal 底层采用的HashCode和HashTable,必然就会有Hash冲突,而ThreadLocal在解决hash冲突时的做法就是采用线性探测法来解决冲突,其性能固然下降

正餐

分析

? 可以确定的方向是FastThreadLocal的底层采用数组的数据结构的方式,现在我们来看看Netty中FastThreadLocalThread、FastThreadLocal、InternalThreadLocalMap三者之间的关系。

动手实践路之Netty的FastThreadLocal篇

动手实践路之Netty的FastThreadLocal篇

该图描述三者之间的关系,简单点说三个线程都运行则每个线程都有自己的InternalThreadLocalMap,set和get时根据ftl的index确定其位置来进行操作。

具体实践

1、InternalThreadLocalMap

属性

	// 唯一ID ,线程安全
    private static final AtomicInteger nextIndex = new AtomicInteger();
    // 存储数据
    private Object[] indexedVariables;
    // 数组长度
    private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;
    // 默认存储值
    public static final Object UNSET = new Object();

方法

  // 构造函数
    private InternalThreadLocalMap() {
        // 初始化数组
        indexedVariables = newIndexedVariabledTable();
    }

    private static Object[] newIndexedVariabledTable() {
        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
        Arrays.fill(array, UNSET);
        return array;
    }

    //得到下标
    public static int nextVariableIndex() {
        int index = nextIndex.getAndIncrement();
        return index;
    }

    // 插入
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            System.out.println(Thread.currentThread()+" "+(String)value);
            return oldValue == UNSET;
        } else {// 扩容处理
        }
        return true;
    }

	// 取值
    public Object indexedVariable(int index) {
        Object[] lookup = indexedVariables;
        return index < lookup.length ? lookup[index] : UNSET;
    }

    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        }
        return null;
    }

    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

2、FastThreadLocal

属性

 private final int index;// 下标
// FastThreadLocal
public FastThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }
//InternalThreadLocalMap
public static int nextVariableIndex() {
        int index = nextIndex.getAndIncrement();
        return index;
}

FastThreadLocal的下标初始化过程是在构造函数中调用InternalThreadLocalMap的nextVariableIndex方法得到一个不会重复的整数。

set( V value)

// FastThreadLocal
public final void set(V value) {
        if (value != InternalThreadLocalMap.UNSET) {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            threadLocalMap.setIndexedVariable(index, value);
        }
    }

|
|

//InternalThreadLocalMap
 public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        }
        return null;
    }

 private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }


//InternalThreadLocalMap
  public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET;
        } else {// 扩容处理
        }
        return true;
    }

set的大致流程:

  1. 通过InternalThreadLocalMap.get()得到当前线程的InternalThreadLocalMap。
  2. 再调用InternalThreadLocalMap.setIndexedVariable方法设置值(更具index确定位置再用新值替换旧值)

v get()

 public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }

        return initialize(threadLocalMap);
    }

    private V initialize(InternalThreadLocalMap threadLocalMap) {
        V v = null;
        try {
            v = initialValue();
        } catch (Exception e) {
            e.printStackTrace();
        }
        threadLocalMap.setIndexedVariable(index, v);
        return v;
    }

    protected V initialValue() throws Exception {
        return null;
    }

get()大致流程

  1. 通过InternalThreadLocalMap.get()得到当前线程的InternalThreadLocalMap。
  2. 判断InternalThreadLocalMap是否创建
  3. 已创建,根据下标直接拿到值 否则 初始化创建并赋值。

3、FastThreadLocalThread

public class FastThreadLocalThread extends Thread {
    private InternalThreadLocalMap threadLocalMap;

    public final InternalThreadLocalMap threadLocalMap() {
        return threadLocalMap;
    }

    public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
        this.threadLocalMap = threadLocalMap;
    }
    public FastThreadLocalThread(Runnable target) {
        super(target);
    }
}

大致是进程Thread,添加InternalThreadLocalMap的属性。

测试用例

Things

public class Things {
    private int ID;//唯一标识
    private final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>() {
        @Override
        protected String initialValue() throws Exception {
            return "FastThreadLocal:Things";
        }
    };

    public Things(int id) {
        this.ID = id;
    }

    public void say() {
        System.out.println(Thread.currentThread() + " " + threadLocal.get());
    }

    public void set(String s) {
        threadLocal.set(s + " " + ID);
    }
}

ThingsRun

public class ThingsRun implements Runnable{
    private Things things;
    public ThingsRun(Things things){
        this.things=things;
    }
    @Override
    public void run() {
        things.say();
        things.set(System.currentTimeMillis()+" ");
        things.say();
    }
}

FastThreadLocalThreadTest

 @Test
    public void run() throws InterruptedException {
        Things t1 = new Things(1);
        Things t2 = new Things(1);
        ThingsRun tr1 = new ThingsRun(t1);
        ThingsRun tr2 = new ThingsRun(t2);
        new FastThreadLocalThread(tr1).start();
        new FastThreadLocalThread(tr2).start();
        Thread.sleep(300000);

    }

result

Thread[Thread-0,5,main] FastThreadLocal:Things
Thread[Thread-0,5,main] FastThreadLocal:Things
Thread[Thread-0,5,main] 1621350074996  1
Thread[Thread-0,5,main] 1621350074996  1
Thread[Thread-1,5,main] FastThreadLocal:Things
Thread[Thread-1,5,main] FastThreadLocal:Things
Thread[Thread-1,5,main] 1621350074996  1
Thread[Thread-1,5,main] 1621350074996  1

结语

本文的完整代码地址:https://github.com/ThreadNew/HandsOnCode/tree/master/HandsOn

本人典型的码农性格,不善表达;文中的表达有误请谅解,如有错误请指出谢谢!祝大家码到成功!。

动手实践路之Netty的FastThreadLocal篇

上一篇:爬虫:HTTP请求与HTML解析(爬取某乎网站)


下一篇:LeetCode 322. 零钱兑换 && 动态规划