并发编程 ThreadLocal源码分析

引入ThreadLocal的场景

最近在写项目的时候自己也有疑虑,因为这个项目涉及到爬虫,使用的WebMagic这款垂直爬虫框架

  • 垂直式爬虫

    垂直型爬虫关注内容与准确还有效率.比较常见的就是舆情项目,财经项目等.仅仅抓取到有效有用的数据,并且在爬虫 抓取之初就能够把抓取到的内容进行简单的处理: 如.提取标题,内容,时间等.

  • WebMagic的大致逻辑

    通过实现PageProcessor接口重写process方法,根据爬取规则制定相应的Site对象,通过实现Pipeline定制自己的爬取到数据之后的逻辑处理

问题发现

Spring项目基本上很多有自己逻辑的重写类都被声明为了 @Component,这个项目中实现爬虫爬取逻辑的Processor类也不例外被声明成了 @Component ,而这种注解的功能就是将自己重写后的逻辑注入到Spring容器成为一个 Bean 之后可以通过 @AutoWired 注入到逻辑需要的地方

那么问题来了,如果使用WebMagic这个开源爬虫框架去实现爬取逻辑,实现的Processor类中如果存在成员变量,并且在多线程启动这个Processor的时候,对该类中的成员变量进行了修改,也就是说多线程情况下对单例的Processor对象成员变量进行了写操作,这个时候就应该好好地去考虑一下并发问题了

@Component
public class BorrowedBookListProcessor implements PageProcessor {


    private Logger logger = LoggerFactory.getLogger(getClass());

    private String sessionId;

    private Site site = Site.me().setRetryTimes(3).setTimeOut(10000).setCharset("GBK");
    
    …………
}

上述代码为实现爬取逻辑的Processor类的部分代码,可以看到除Site和Logger成员变量之外还有一个并没有任何采取同步措施的sessionId的String对象

这个类的逻辑:

  1. 后端模拟登录之后,登录成功取出Cookie中的sessionId,并且在之后的爬取中带上这个sessionId去爬取对应学生的相应数据
  2. 如果一条线程使用Processor爬取的时候被阻塞或者正在尝试重新爬取这个时候另一个用户登录 传入新的sessionId 虽然是在一条新的线程里,但是这个时候因为对新线程中的sessionId发生了新的赋值就是写操作,这个时候sessionId就可能会被写回主内存进行更新(虽然这个时候这种可见性可能不会立马被修改过后的线程写回主存,但是这个时候已经存在了并发问题的风险了)
  3. 一旦主存更新之后,每条线程依然通过可见性原则就会取到最新的值,这个时候就并发问题就发生了

在使用到Spring的时候,也同样有很多源码的实现也同样用到了ThreadLocal变量去解决并发问题

ThreadLocal是什么

并发编程 ThreadLocal源码分析
Java内存模型

在Java内存模型中,可以看出来,每一条线程使用共享变量X的时候都是先从主内存中拷贝一份放置在自己的工作内存进行使用,但是ThreadLocal变量给我感觉有点不遵守JMM规范了..

ThreadLocal,很多地方叫做线程本地变量,也有些地方叫做线程本地存储。ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。

class ConnectionManager {
 
    private static Connection connect = null;
     
    public static Connection openConnection() {
        if(connect == null){
            connect = DriverManager.getConnection();
        }
        return connect;
    }
     
    public static void closeConnection() {
        if(connect!=null)
            connect.close();
    }
}

假设有这样一个数据库连接管理类,这段代码在单线程中使用是没有任何问题的,但是如果在多线程中使用呢?很显然,在多线程中使用会存在线程安全问题:

  • 这里面的2个方法都没有进行同步,很可能在openConnection方法中会多次创建connect;
  • 由于connect是共享变量,那么必然在调用connect的地方需要使用到同步来保障线程安全,因为很可能一个线程在使用connect进行数据库操作,而另外一个线程调用closeConnection关闭连接。

上面描述到的就是对共享资源最常见的一种并发问题,这也就是说明,在编写这段代码的时候,就需要考虑到并发问题,可是到底是通过synchronized这种较为重量级的同步方式去保护connect还是通过更轻量的的方式去解决这种并发问题呢

试想一下,如果多线程情况下需要使用到connect变量,这个变量到底应不应该是一个共享变量,如果每条线程都有属于自己的connect变量,这样也就不会再需要共享变量,也就更不需要去考虑这种无所谓的并发问题,因为每个connect是属于每条线程的,直接使用即可,不需要再去经过读取主存,修改后写回主存保证可见性的这种容易引起并发问题的操作

还有这样一种思路,就是在每一条线程中创建这个ConnectionManager对象,进行管理conect,使用的时候创建并获取连接,不需要的时候关闭连接,可是这样的话,如果是一个并发量较大的程序,这种方式无疑给服务器造成了很重的负担,比如如下代码

class ConnectionManager {
 
    private  Connection connect = null;
     
    public Connection openConnection() {
        if(connect == null){
            connect = DriverManager.getConnection();
        }
        return connect;
    }
     
    public void closeConnection() {
        if(connect!=null)
            connect.close();
    }
}


class Dao{
    public void insert() {
        ConnectionManager connectionManager = new ConnectionManager();
        Connection connection = connectionManager.openConnection();
         
        //使用connection进行操作
         
        connectionManager.closeConnection();
    }
}

这种情况下,引入ThreadLocal就在合适不过了 ,ThreadLocal在每个线程中,对使用的变量都会创建一个单独副本,也就是说,每条线程虽然使用的是实现统一目的的变量,但是这个变量属于每一个线程,之间互不干扰,也不会存在任何线程安全问题

但是要注意,虽然ThreadLocal能够解决上面说的问题,但是由于在每个线程中都创建了副本,所以要考虑它对资源的消耗,比如内存的占用会比不使用ThreadLocal要大。

如果在多线程并发环境中,一个可变对象涉及到共享与竞争,那么该可变对象就一定会涉及到线程间同步操作,这是多线程并发问题。

否则该可变对象将作为线程私有对象,可通过ThreadLocal进行管理,实现线程间私有对象隔离的目的。

可以发现,ThreadLocal并没有解决多线程并发的问题,因为ThreadLocal管理的可变对象的性质本来就不会涉及到多线程并发而引发的共享、竞争和同步问题,使用ThreadLocal管理只是方便了多线程获取和使用该私有可变对象的途径和方式。

《Java并发编程实践》

线程本地(ThreadLocal)变量通常用于防止在基于可变的单体(Singleton)或全局变量的设计中,出现(不正确的)共享。

ThreadLocal源码分析

看一下官方为ThreadLocal这个类写的JavaDoc

/**
 * This class provides thread-local variables.  These variables differ from
 * their normal counterparts in that each thread that accesses one (via its
 * {@code get} or {@code set} method) has its own, independently initialized
 * copy of the variable.  {@code ThreadLocal} instances are typically private
 * static fields in classes that wish to associate state with a thread (e.g.,
 * a user ID or Transaction ID).
 *
 * <p>Each thread holds an implicit reference to its copy of a thread-local
 * variable as long as the thread is alive and the {@code ThreadLocal}
 * instance is accessible; after a thread goes away, all of its copies of
 * thread-local instances are subject to garbage collection (unless other
 * references to these copies exist).
 *
 * @author  Josh Bloch and Doug Lea
 * @since   1.2
 */

上面这段JavaDoc是源码上的JavaDoc,翻译一下的大概意思是

该类提供线程局部变量,这些变量与普通对应变量的不同之处在于,访问一个变量的每个线程通过它自己的get/set方法都有自己独立初始化的变量副本。ThreadLocal实例通常是为了关联一种状态(例如用户ID或者事务ID)并且私有在各自的线程中

每条线程都拥有对其本地线程副本的隐式引用变量,只要线程处于活跃状态并且ThreadLocal实例可以访问,当一个线程消失之后,它的所有副本线程局部实例奖杯垃圾回收(除非还存在其他对这些副本的引用)

在这个JavaDoc中还提供了一个例子来说明如何使用这个LocalThread变量

/**
 * <p>For example, the class below generates unique identifiers local to each
 * thread.
 * A thread's id is assigned the first time it invokes {@code ThreadId.get()}
 * and remains unchanged on subsequent calls.
 * <pre>
*/
public class ThreadId {
    // Atomic integer containing the next thread ID to be assigned
    private static final AtomicInteger nextId = new AtomicInteger(0);

    // Thread local variable containing each thread's ID
    private static final ThreadLocal<Integer> threadId =
            ThreadLocal.withInitial(() -> nextId.getAndIncrement());

    // Returns the current thread's unique ID, assigning it if necessary
    public static int get() {
        return threadId.get();
    }
}

例如,上面生成的类生成每个线程本地的唯一标识符。线程的id在第一次调用{@code ThreadId.get()}时分配,并在后续调用中保持不变。

ThreadLocal的几个主要方法

  • public T get() {}

    get()方法是用来获取ThreadLocal在当前线程中保存的变量副本

  • public void set(T value) {}

    set()用来设置当前线程中变量的副本

  • public void remove() {}

    remove()将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是JDK 5.0新增的方法。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。

  • protected T initialValue() {}

    返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类重写而设计的。这个方法是一个延迟调用方法,在线程第1次调用get()或set(Object)时才执行,并且仅执行1次。ThreadLocal中的默认实现直接返回一个null。

get()方法

/**
 * Returns the value in the current thread's copy of this
 * thread-local variable.  If the variable has no value for the
 * current thread, it is first initialized to the value returned
 * by an invocation of the {@link #initialValue} method.
 *
 * @return the current thread's value of this thread-local
 */    
public T get() {
    //获取当前线程
    Thread t = Thread.currentThread();
    //通过当前变量返回当前线程对应ThreadLocalMap实例
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        //获取ThreadLocalMap中的实体类实例
        //注意这里方法传参是this而不是t
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            //如果获得的实体实例不为空则通过调用e的value获取当前线程保存的变量值
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    
    //如果map为空,延迟执行初始化方法
    return setInitialValue();
}

如果map为空的情况下,会调用setInitialValue();方法初始化map的同时以ThreadLocal实例作为key默认实现的null作为值进行存储

ThreadLocalMap是什么

  • ThreadLocalMap是ThreadLocal的静态内部类
  • ThreadLocalMap把其外部类ThreadLocal的实例对象作为key,把要管理的可变对象作为value
  • ThreadLocalMap的实例对象,由当前当前线程对象的Thread的实例持有,而不是ThreadLocal持有
/**
 * ThreadLocalMap is a customized hash map suitable only for
 * maintaining thread local values. No operations are exported
 * outside of the ThreadLocal class. The class is package private to
 * allow declaration of fields in class Thread.  To help deal with
 * very large and long-lived usages, the hash table entries use
 * WeakReferences for keys. However, since reference queues are not
 * used, stale entries are guaranteed to be removed only when
 * the table starts running out of space.
 */
static class ThreadLocalMap {

    /**
     * The entries in this hash map extend WeakReference, using
     * its main ref field as the key (which is always a
     * ThreadLocal object).  Note that null keys (i.e. entry.get()
     * == null) mean that the key is no longer referenced, so the
     * entry can be expunged from table.  Such entries are referred to
     * as "stale entries" in the code that follows.
     */
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    …………
}

并且ThreadLocalMap变量的持有者是当前Thread并不是ThreadLocal

setInitialValue()方法

/**
 * Variant of set() to establish initialValue. Used instead
 * of set() in case user has overridden the set() method.
 *
 * @return the initial value
 */
private T setInitialValue() {
    //调用ThreadLocal默认实现的initialValue()方法
    T value = initialValue();
    Thread t = Thread.currentThread();
    //通过当前线程t来获取ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    if (map != null)
        //设置初始值
        map.set(this, value);
    else
        //若map为null则为当前线程t创建属于它自己的ThreadLocalMap实例
        createMap(t, value);
    return value;
}

initialValue()方法

/**
 * Returns the current thread's "initial value" for this
 * thread-local variable.  This method will be invoked the first
 * time a thread accesses the variable with the {@link #get}
 * method, unless the thread previously invoked the {@link #set}
 * method, in which case the {@code initialValue} method will not
 * be invoked for the thread.  Normally, this method is invoked at
 * most once per thread, but it may be invoked again in case of
 * subsequent invocations of {@link #remove} followed by {@link #get}.
 *
 * <p>This implementation simply returns {@code null}; if the
 * programmer desires thread-local variables to have an initial
 * value other than {@code null}, {@code ThreadLocal} must be
 * subclassed, and this method overridden.  Typically, an
 * anonymous inner class will be used.
 *
 * @return the initial value for this thread-local
 */
protected T initialValue() {
    return null;
}

这个默认实现的初始化值的方法很简单,直接返回一个null,可以看出,在setInitialValue()方法中只需要调用这个方法设置一个初始化的值,至于默认实现就是null,可以选择重写该方法实现自己想要设置的默认初始值

createMap()方法

/**
 * Create the map associated with a ThreadLocal. Overridden in
 * InheritableThreadLocal.
 *
 * @param t the current thread
 * @param firstValue value for the initial entry of the map
 */
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

createMap()方法同样也很简单,如果当前线程不存在ThreadLocalMap实例变量,通过调用这个方法为当前线程实例化一个新的ThreadLocalMap对象,并且将ThreadLocal设置为ThreadLocalMap实例的key,将默认实现的null(value)作为ThreadLocalMap实例的值存储下来

getMap()方法

/**
 * Get the map associated with a ThreadLocal. Overridden in
 * InheritableThreadLocal.
 *
 * @param  t the current thread
 * @return the map
 */
ThreadLocalMap getMap(Thread t) {
    //返回当前线程的threadLocals变量,该变量的值为ThreadLocalMap类型
    return t.threadLocals;
}

从上面代码可以发现,每一个线程对象都有一个专属自己的ThreadLocalMap对象,而ThreadLocalMap对象存储了ThreadLocal对象与变量对象。线程对象、ThreadLocalMap对象、ThreadLocal对象、变量对象之间的关系如下图:

[图片上传失败...(image-5d3771-1532743824302)]

set(T value)方法

/**
 * Sets the current thread's copy of this thread-local variable
 * to the specified value.  Most subclasses will have no need to
 * override this method, relying solely on the {@link #initialValue}
 * method to set the values of thread-locals.
 *
 * @param value the value to be stored in the current thread's copy of
 *        this thread-local.
 */
public void set(T value) {
    //获取当前线程
    Thread t = Thread.currentThread();
    //当前线程获取当前线程内的ThreadLocalMap实例
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        //ThreadLocalMap实例为null的时候创建map
        createMap(t, value);
}
  • set方法用来修改或者初始化ThreadLocal管理的变量对象。
  • ThreadLocal对象调用get方法获取变量的,要么重写initialValue()方法,要么主动调用set方法,否则将返回null。

线程同步机制比较

ThreadLocal和线程同步机制相比有什么优势呢?

可以这么说Synchronized用于线程间的数据共享(同步)而ThreadLocal则用于线程间的数据隔离(在每一条线程中创建属于自己的ThreadLocal变量)

在同步机制中,通过对象的锁机制保证同一时间只有一个线程访问变量。这时该变量是多个线程共享的,使用同步机制要求慎密地分析什么时候对变量进行读写,什么时候需要锁定某个对象,什么时候释放对象锁等繁杂的问题,还记得一个重入问题当时自己都看了很久到现在可能会还是有些疏漏

而ThreadLocal则从另一个角度来解决多线程的并发访问。ThreadLocal会为每一个线程提供一个独立的变量副本,从而隔离了多个线程对数据的访问冲突。因为每一个线程都拥有自己的变量副本,从而也就没有必要对该变量进行同步了。ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的变量封装进ThreadLocal。

概括起来说,对于多线程资源共享的问题,同步机制采用了“以时间换空间”的方式,而ThreadLocal采用了“以空间换时间”的方式。前者仅提供一份变量,让不同的线程排队访问,而后者为每一个线程都提供了一份变量,因此可以同时访问而互不影响。

Spring使用ThreadLocal解决线程安全问题,我们知道在一般情况下,只有无状态的Bean才可以在多线程环境下共享,在Spring中,绝大部分Bean都可以声明为singleton作用域。就是因为Spring对一些Bean(如RequestContextHolder、TransactionSynchronizationManager、LocaleContextHolder等)中非线程安全状态采用ThreadLocal进行处理,让它们也成为线程安全的状态,因为有状态的Bean就可以在多线程*享了。

一般的Web应用基本采用MVC三个层次,在不同的层中编写对应的逻辑,下层通过接口向上层开放功能调用。在一般情况下, 从接收请求到返回响应所经过的所有程序调用都同属于一个线程。

上一篇:Oracle内核技术揭密. 导读


下一篇:MaxCompute与DataWorks权限介绍和示例