《python解释器源码剖析》第16章--python的多线程机制

16.0 序

在介绍多线程之前,我们要先知道线程是什么,线程是操作系统调度cpu工作的最小单元,同理进程则是操作系统资源分配的最小单元,线程是需要依赖于进程的,并且每一个进程只少有一个线程,这个线程我们称之为主线程。而主线程则可以创建子线程,一个进程中有多个线程去工作,我们就称之为多线程。关于线程,请记住两句话,这两句话我们在前面章节中也已经提过了。

python中的一个线程,对应c语言中的一个线程,然后对应操作系统的一个线程,操作系统的线程我们一般称之为原生线程,这三者是一一对应的。

python中的线程是货真价实的线程,并且每一个线程都有对应的线程对象(PyThreadState),记录了当前线程所有的状态信息

至于多线程这个概念估计很多人再熟悉不过了,开发一个多线程应用程序是很常见的事情,很多语言都支持多线程,有的是原生支持,有的是通过库的支持。而python毫无疑问也支持多线程,并且它是通过库的方式实现的。提到python的多线程,会让人想到GIL(global interpreter lock)这个万恶之源,但是至少在我看来,这是一个不错的机制。关于python的多线程、GIL我们后面会具体介绍,以及python中的多线程为什么在很多人眼中很"鸡肋"。目前只需要知道python中的多线程是不能利用多核的,因为python虚拟机使用一个全局解释器锁(GIL)来控制线程对程序的执行,这个结果就使得无论你的cpu有多少核,但是同时被线程调度的cpu只有一个。

举个例子,首先线程若想执行程序,就必须拿到GIL这把全局锁,假设有两个线程A和B,A线程在执行的时候拿到了这把锁,那么B线程也想执行,但是锁被A拿走了,那咋办?不好意思只能等着了,然而A执行一会之后,会把锁释放,然后B才能拿到这把锁去执行,在B执行的时候,此时A就没办法执行了。因为锁只有一把,一个线程拿到锁去执行程序的时候,那么其他的线程就必须等待,只有当A把锁释放之后,其他线程才有机会执行。至于锁什么时候释放,python采取的策略如下:

当遇见io阻塞的时候会把锁释放,因为io阻塞是不耗费cpu的,所以此时虚拟机会把该线程的锁释放

即便是耗费cpu的运算等等,也不会一直执行,会在执行一小段时间之后释放锁,为了保证其他线程都有机会执行,就类似于cpu的时间片轮转的方式

16.1 GIL与线程调度

那么python为什么需要GIL呢?我们知道python虚拟机执行的是python的字节码,对于n += 1这样一条语句,python是分为LOAD_GLOBALLOAD_CONSTBINARY_ADDSTORE_NAME这样四条指令来执行的,千万不要以为n+=1是一个整体。那这个跟GIL有什么关系呢,举个栗子。

假设此时n=8,A和B同时执行n+=1,A线程拿到的n是8,还没来得及加1,就被其它线程加成9,但是A对应的还是原来的8,所以此时就会出现错误。因为线程是抢占式执行的,为了避免资源竞争,所以需要加上一把全局的超级大锁。但是即便如此,我们多个线程在对同一个数字进行相加减的时候,得到结果也基本上是不正确的。咦,不是说有全局锁吗?没错,是有锁,但是锁会被释放,如果A线程没来得及加1的时候被挂起了,然后B线程给n+=1对应的多个字节码都执行了,结果依旧是不对的。因此对于这种对应的多组字节码不是一个整体、而是会被打断的代码,即便有全局锁也是不行的,在多线程使用的时候依旧需要手动上锁(后面说)。可能有人为问,那么GIL存在的意义是什么,没有GIL的话,多个线程出现竞争能理解,但是有了GIL又会存在释放的问题,如果在关键时候释放了,导致其他线程碰巧又把值给改了,那么结果不是一样的吗?

所以我们说对于那些会被打断的字节码指令,即便有了全局锁也是不行的,但如果不会被打断呢?别急,我们再来看个例子

还是假设有两个线程A和B,两个线程内部都引用了全局对象obj,也就是说这个时候obj->ob_refcnt等于2。而如果A销毁对obj的引用,那么显然A将通过Py_DECREF来调整obj的引用计数值。我们知道,Py_DECREF的整个动作分为两部分:

--obj->ob_refcnt
if (obj -> ob_refcnt == 0){
    销毁obj
}

如果A在执行完第一个动作之后,obj->ob_refcnt的值变为1,然而不幸的是这个时候调度机制将线程A挂起了,而唤醒了B。而B也开始销毁obj,当B完成了第一个动作之后,ob->ob_refcnt变为0,但是B比较幸运,它是两个步骤一块都执行完了,此时已经将对象销毁、内存释放。然后A被重新唤醒,然后执行第二个步骤,但是此时的obj->ob_refcnt已经被减少到0,条件满足,那么A依旧会对obj进行释放,但是obj早已不在,那么会发生什么结果,只有天知道。

所以结论就是有一些操作是被保护的,比如del obj,GIL是一定会确保这行代码对应的所有指令都被执行完毕才会可能释放锁。

因此为了实现多线程机制,一个基本的要求就是需要实现不同线程对共享资源的互斥。因此在python中,就体现为无论何时在同一时间只有一个线程能访问python所提供的的api。因此如果是单核是无所谓的,但是对于多核的话,python这种做法无疑是进行了限制。但是在python1.5的时候,有两位老铁Greg SteinMark Hammond基于python1.5,把里面的GIL给去掉了,但是发现对于单线程来说,效率只有原来的一半,因为把GIL去掉了,就意味着需要更细粒度的锁,这就会导致大量的加锁、解锁,而加锁、解锁对于操作系统来说是一个比较重量级的操作。而且最关键的时候,没有了GIL,编写python扩展模块的难度大大增加。

我们说,去GIL的老铁有两位,分别是Greg SteinMark Hammond,这个Mark Hammond估计很多人都见过,如果没见过,说明你Windows安装python的时候不怎么关注。

《python解释器源码剖析》第16章--python的多线程机制

特别感谢,Mark Hammond,没有它这些年无偿分享的Windows专业技术,那么python如今仍会运行在DOS上

扯远了,我们可以对python的多线程建立一个粗略的模型。网上找到了一张图(https://www.cnblogs.com/zhangqigao/articles/7258364.html),非常的不错。

《python解释器源码剖析》第16章--python的多线程机制

所以最上面有着绿色背景的是python中的线程,要想通过虚拟机执行指令,就必须拿到GIL,但是只有一个线程能够拿到。然后会交给c语言中的线程,进行交给操作系统的线程去调度cpu执行,执行完毕之后再将结果返回。

所以python是通过c语言的线程调用原生线程(python线程是对c线程的一个封装),而每个线程在执行的过程中python解释器是控制不了的,因为调用的是c的接口,超出了python的控制范围。python的控制范围只有在python解释器这一层,一旦真正的线程启动之后,它就只能等待结果。如果多个线程一起调用,那么谁先返回python是不知道的。所以python的做法就是,在解释器的层面上加上一把锁,一次只让一个python线程穿过解释器调用原生线程执行不就好了吗?这样返回的结果不久确定了吗?因为同时就一个线程在执行嘛。

《python解释器源码剖析》第16章--python的多线程机制

就类似于上图的结果,你执行一会儿,我执行一会儿,这样原生线程执行完之后该把结果返回给谁,就不会出岔子了

因此我们知道,对于python而言,解释执行字节码是python的核心所在,所以python通过GIL来互斥不同线程调用解释器执行字节码。如果一个线程想要执行,就必须拿到GIL,而一旦拿到GIL,其他线程就无法执行了,如果想执行,那么只能等GIL释放、被自己获取之后才可以执行。然而实际上,GIL保护的不仅仅是python的解释器,同样还有python的C API,在C/C++和python混合开发时,在涉及到原生线程和python线程相互合作时,也需要通过GIL进行互斥,这一点我们后面会详细说。

那么问题来了,一个线程在获取到GIL之后,那么它要在何时释放呢?难道等一个线程执行完毕?要是这样的话,所有的线程不都变成串行的了,这样的多线程机制有啥意义呢?因此毫无疑问,python有一套线程的调度机制。这个我们在上面已经说过了,但是只是说了结论,关键的两个问题却没有说:

  • 在何时挂起线程,选择处于等待状态的下一个线程?
  • 在众多的处于等待状态的候选线程中,选择激活哪一个线程?

在python的多线程机制中,这两个问题是分别由不同的层次解决的。对于何时进行线程调度问题,是由python自身决定的。考虑一下操作系统是如何进行进程切换的,当一个进程运行了一段时间之后,发生了时钟中断,操作系统响应时钟,并开始进行进程的调度。同样,python中也是模拟了这样的时钟中断,来激活线程的调度。我们知道python字节码解释的原理是按照指令的顺序一条一条执行,python内部维护这一个数组,这个数值就是python内部的时钟。在python2中如果一个线程执行的执行的字节数达到了这个值,那么会进行线程切换,并且这个值在python中仍然存在

import sys
# 我们看到默认是执行100条字节码启动线程调度机制,进行切换
# 这个方法python2、3中都存在
print(sys.getcheckinterval())  # 100

# 但是在python3中,我们更应该使用这个函数,表示线程切换的时间间隔。
# 表示一个线程在执行0.05s之后进行切换
print(sys.getswitchinterval())  # 0.05

# 上面的方法我们都可以手动设置
# 通过sys.getcheckinterval(N)和sys.getswitchinterval(N)设置即可

除了执行之间之外,还有就是我们之前说的遇见IO阻塞的时候会进行切换,所以多线程在IO密集型还是很有用处的,说实话如果IO都不会自动切换的话,那么我觉得python的多线程才是真的没有用,至于为什么IO会切换我们后面说,总是现在我们知道python会在什么时候进行线程切换了。那么下面的问题就是,python在切换的时候会从等待的线程中选择哪一个呢?对于这个问题,python完全没有插手,而是同样借用了底层操作系统所提供的调度机制来决定下一个进入python解释器的线程究竟是谁。

对于python的多线程机制,正是建立在操作系统原生线程的基础之上,对应不同的操作系统,有着不同的实现。然而最终,python提供了统一的接口来使用多线程,这个接口就是threading模块,threading模块底层依赖于_thread

16.2 初见python _thread

我们说threading是我们使用多线程经常使用的模块,这个模块是以一个py文件的形式存在的,但是这个模块依赖于_thread,而这个_thread模块底层是由c所写。我们来看看这个_thread模块

《python解释器源码剖析》第16章--python的多线程机制

注意:这个模块是c写的,内嵌在解释器里面,我们可以import调用,但是在python的安装目录里面却看不到。像这种底层是c写的、内嵌在解释器里面的模块、或者那些非常著名的模块里面的、无法使用文本打开查看的pyd文件,pycharm都提前给你做了一个抽象,并且把注释给你写好。只不过里面都是pass、或者返回零值。

然后我们通过源码看看python底层_thread的实现。还记得我们在第0章的时候,说的python源码的Modules目录吗?我们说这个目录里面都存放了许多模块,当然都是用c实现的,编译完python之后,这些c写的模块就会内嵌在python解释器里面。主要是针对那些对性能要求比较高的,所以使用的是底层c写好的内嵌在解释器里面的模块,至于对性能要求不高的则直接使用python实现了,放在Lib目录下,不记得的可以回头看一下。当然这些底层的模块在python的层面上也都是可以调用的,但只不过我们一般不直接调用。比如_random_collections_thread,我们肯定不会直接调用这些解释器里面的模块,而是会调用randomcollectionsthreading,然后让它们调用对应的解释器的模块。说了这么多,只是为引出_thread是在Modules里面。玛德,前戏真长啊。

python中_thread的底层实现是在_threadmodule.c中,我们来看看它都提供了哪些接口

//Modules/_threadmodule.c
static PyMethodDef thread_methods[] = {
    {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"allocate_lock",           (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"allocate",                (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"exit_thread",             (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"exit",                    (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"interrupt_main",          (PyCFunction)thread_PyThread_interrupt_main,
     METH_NOARGS, interrupt_doc},
    {"get_ident",               (PyCFunction)thread_get_ident,
     METH_NOARGS, get_ident_doc},
    {"_count",                  (PyCFunction)thread__count,
     METH_NOARGS, _count_doc},
    {"stack_size",              (PyCFunction)thread_stack_size,
     METH_VARARGS, stack_size_doc},
    {"_set_sentinel",           (PyCFunction)thread__set_sentinel,
     METH_NOARGS, _set_sentinel_doc},
    {NULL,                      NULL}           /* sentinel */
};

我们看第一个start_new_thread和第二个start_new,意外发现它们竟然都对应thread_PyThread_start_new_thread这个函数,这些接口和_thread.py中对应的是一致的。

16.3 线程的创建

当我们使用threading模块创建一个线程的时候,threading会调用_thread模块来创建,而在_thread中显然是通过这里面的start_new_thread对应的thread_PyThread_start_new_thread创建的,下面我们就来看看这个函数。

//_threadmodule.c
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
    PyObject *func, *args, *keyw = NULL;
    struct bootstate *boot;
    unsigned long ident;
    
    //这里都是进行参数检测的,如果使用多线程的话,下面的这些检测应该会很熟悉
    if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
                           &func, &args, &keyw))
        return NULL;
    if (!PyCallable_Check(func)) {
        //target
        PyErr_SetString(PyExc_TypeError,
                        "first arg must be callable");
        return NULL;
    }
    if (!PyTuple_Check(args)) {
        //args
        PyErr_SetString(PyExc_TypeError,
                        "2nd arg must be a tuple");
        return NULL;
    }
    if (keyw != NULL && !PyDict_Check(keyw)) {
        //kwargs
        PyErr_SetString(PyExc_TypeError,
                        "optional 3rd arg must be a dictionary");
        return NULL;
    }
    
    //创建bootstate结构
    boot = PyMem_NEW(struct bootstate, 1);
    if (boot == NULL)
        return PyErr_NoMemory();
    //获取进程对象
    boot->interp = PyThreadState_GET()->interp;
    boot->func = func;
    boot->args = args;
    boot->keyw = keyw;
    boot->tstate = _PyThreadState_Prealloc(boot->interp);
    if (boot->tstate == NULL) {
        PyMem_DEL(boot);
        return PyErr_NoMemory();
    }
    Py_INCREF(func);
    Py_INCREF(args);
    Py_XINCREF(keyw);
    
    //初始化多线程环境,记住这一步
    PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
    //创建线程
    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
    if (ident == PYTHREAD_INVALID_THREAD_ID) {
        PyErr_SetString(ThreadError, "can't start new thread");
        Py_DECREF(func);
        Py_DECREF(args);
        Py_XDECREF(keyw);
        PyThreadState_Clear(boot->tstate);
        PyMem_DEL(boot);
        return NULL;
    }
    return PyLong_FromUnsignedLong(ident);
}

因此在这个函数中,我们看到python虚拟机通过三个主要的动作完成一个线程的创建。

  • 创建并初始化bootstate结构boot,在boot中,将保存线程的一切信息,如:线程过程,线程过程的参数等。
  • 初始化python的多线程环境
  • 以boot为参数,创建子线程,子线程也会对应操作系统的原生线程

另外我们看到了这一步:boot->interp = PyThreadState_GET()->interp;,说明boost保存了python的PyInterpreterState对象,这个对象中携带了python的对象池(module pool)这样的全局信息,python中所有的thread都会保存这些全局信息

我们在下面还看到了多线程环境的初始化动作,这一点需要注意,python在启动的时候是不支持多线程的。换言之,python中支持多线程的数据结构、GIL都是没有被创建的。因为对多线程的支持是需要代价的,如果上来就激活了多线程,但是程序却只有一个主线程,那么python仍然会执行所谓的线程调度机制,只不过调度完了还是它自己,所以这无异于在做无用功。因此python将开启多线程的权利交给了程序员,自己在启动的时候是单线程的,既然是单线程,还是那句话此时就不存在线程调度了、当然也没有GIL。一旦用户调用了threading.Thread(...).start() -> _thread.start_new_thread(),则代表明确地指示虚拟机要创建新的线程了,这个时候python虚拟机就知道自己该创建与多线程相关的东西了,如:数据结构、环境、以及那个至关重要的GIL。

16.3.1 建立多线程环境

多线程环境的建立,说的直白一点,主要就是创建GIL。我们已经知道了GIL对于python的多线程机制的重要意义,那么这个GIL是如何实现的呢?这是一个比较有趣的问题,下面我们就来看看gil长什么样子吧。

//Python/ceval_gil.h
#define DEFAULT_INTERVAL 5000

//include/internal/ceval.h
struct _ceval_runtime_state {
    int recursion_limit;
    int tracing_possible;
    _Py_atomic_int eval_breaker;
    _Py_atomic_int gil_drop_request;
    struct _pending_calls pending;
    struct _gil_runtime_state gil;  //我们看到gil是一个_gil_runtime_state结构体类型
};

//Include/internal/gil.h
struct _gil_runtime_state {
    //一个线程拥有gil的间隔,默认是5000毫秒,也就是我们上面用sys.getswitchinterval()得到的0.05
    unsigned long interval;
    
    //最后一个持有GIL的PyThreadState(线程),
    //这有助于我们知道在丢弃GIL后是否还有其他线程被调度
    _Py_atomic_address last_holder;
    
    //GIL是否被获取,如果未被获取这个值为-1,这个是原子性的,因为在ceval.c中不需要任何锁就能够读取它
    _Py_atomic_int locked;
    
    //从GIL创建之后,总共切换的次数
    unsigned long switch_number;
    
    //cond允许一个或多个线程等待,直到GIL被释放
    //mutex则是负责保护上面的变量
    PyCOND_T cond;
    PyMUTEX_T mutex;
    
#ifdef FORCE_SWITCHING
    PyCOND_T switch_cond;
    PyMUTEX_T switch_mutex;
#endif
};


//所以我们看到gil是_gil_runtime_state类型,然后内嵌在结构体_ceval_runtime_state里面
/*
gil是一个结构体,根据里面的gil.locked判断这个gil有没有人获取
而这个locked可以看成是一个布尔变量,其访问受到gil.mutex保护,是否改变则取决于gil.cond

在持有gil的线程中,主循环(PyEval_EvalFrameEx)必须能通过另一个线程来按需释放gil
*/

并且我们知道在创建多线程的时候,首先是需要调用PyEval_InitThreads()进行初始化的。我们就来看看这个函数

//ceval.c
void
PyEval_InitThreads(void)
{   
    //如果gil已经创建,那么直接返回
    if (gil_created())
        return;
    
    //创建gil
    create_gil();
    //gil创建了,那么就要拿到这个gil
    take_gil(PyThreadState_GET());
    //获取当前线程的id,显然就是主线程
    _PyRuntime.ceval.pending.main_thread = PyThread_get_thread_ident();
    if (!_PyRuntime.ceval.pending.lock)
        //拿到gil了,那么不好意思这个时候要锁上
        _PyRuntime.ceval.pending.lock = PyThread_allocate_lock();
}
//ceval_gil.h
static int gil_created(void)
{
    return (_Py_atomic_load_explicit(&_PyRuntime.ceval.gil.locked,
                                     _Py_memory_order_acquire)
            ) >= 0;
}
//我们看到这个gil_created就是用来检测gil有没有被创建的

static void create_gil(void)
{
    //这里是创建gil的
    //我们看到这里是初始化gil里面的成员的
    MUTEX_INIT(_PyRuntime.ceval.gil.mutex);
#ifdef FORCE_SWITCHING
    MUTEX_INIT(_PyRuntime.ceval.gil.switch_mutex);
#endif
    COND_INIT(_PyRuntime.ceval.gil.cond);
#ifdef FORCE_SWITCHING
    COND_INIT(_PyRuntime.ceval.gil.switch_cond);
#endif
    _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder, 0);
    _Py_ANNOTATE_RWLOCK_CREATE(&_PyRuntime.ceval.gil.locked);
    _Py_atomic_store_explicit(&_PyRuntime.ceval.gil.locked, 0,
                              _Py_memory_order_release);
}


//获取gil
static void take_gil(PyThreadState *tstate)
{
    int err;
    if (tstate == NULL)
        Py_FatalError("take_gil: NULL tstate");

    err = errno;
    MUTEX_LOCK(_PyRuntime.ceval.gil.mutex);

    if (!_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked))
        //判断是否已经上锁,如果没上锁,那么直接获取
        goto _ready;

    while (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked)) {
        //如果上锁了,那么一直执行这个循环
        int timed_out = 0;
        unsigned long saved_switchnum;
        ...
        ...    
    }
_ready:
#ifdef FORCE_SWITCHING
    /* This mutex must be taken before modifying
       _PyRuntime.ceval.gil.last_holder (see drop_gil()). */
    MUTEX_LOCK(_PyRuntime.ceval.gil.switch_mutex);
#endif
    /* We now hold the GIL */
    //获取到gil的时候,那么会通过_Py_atomic_store_relaxed对其再次上锁
    _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.locked, 1);
    _Py_ANNOTATE_RWLOCK_ACQUIRED(&_PyRuntime.ceval.gil.locked, /*is_write=*/1);

    if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(
                    &_PyRuntime.ceval.gil.last_holder))
    {
        _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder,
                                 (uintptr_t)tstate);
        ++_PyRuntime.ceval.gil.switch_number;
    }
    ...
}

最后我们再来看看PyThread_allocate_lock,这个函数分为两种,Windows和linux各不相同。

//thread_nt.h

typedef struct _NRMUTEX
{
    PyMUTEX_T cs;
    PyCOND_T cv;
    int locked;
} NRMUTEX;
typedef NRMUTEX *PNRMUTEX;

PNRMUTEX
AllocNonRecursiveMutex()
{
    PNRMUTEX m = (PNRMUTEX)PyMem_RawMalloc(sizeof(NRMUTEX));
    ...
    m->locked = 0;
    return m;
    ...
}

PyThread_type_lock
PyThread_allocate_lock(void)
{
    PNRMUTEX aLock;

    dprintf(("PyThread_allocate_lock called\n"));
    if (!initialized)
        PyThread_init_thread();  //判断有没有被初始化,如果没有初始化,那么进行初始化
    
    //这里可以简单认为返回了0
    aLock = AllocNonRecursiveMutex() ;

    dprintf(("%lu: PyThread_allocate_lock() -> %p\n", PyThread_get_thread_ident(), aLock));

    return (PyThread_type_lock) aLock;
}

//thread_pthread.h
PyThread_type_lock
PyThread_allocate_lock(void)
{
    sem_t *lock;
    int status, error = 0;

    dprintf(("PyThread_allocate_lock called\n"));
    if (!initialized)
        PyThread_init_thread();

    lock = (sem_t *)PyMem_RawMalloc(sizeof(sem_t));

    if (lock) {
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");

        if (error) {
            PyMem_RawFree((void *)lock);
            lock = NULL;
        }
    }

    dprintf(("PyThread_allocate_lock() -> %p\n", lock));
    return (PyThread_type_lock)lock;
}

python的多线程机制和平台有关系,需要进行统一的封装。python的线程在获取gil的时候,会检查当前gil是否可用。而其中的locked域就是指示当前gil是否可用,如果这个值为-1,那么代表可用,那么会获取gil,一旦获取就必须要将gil的locked设置为0,表示当前gil已被一个线程占用。一旦当该线程释放gil的时候,就一定要将该值减去1,这样gil的值才会从0变成-1,才能被其他线程使用。所以官方把gil的locked说成是布尔类型也不是没道理的。

最终在一个线程释放gil时,会通知所有在等待gil的线程,这些线程会被操作系统唤醒。但是这个时候会选择哪一个线程执行呢?之前说了,这个时候python会直接借用操作系统的调度机制随机选择一个。

16.3.2 创建线程

16.3.2.1 子线程的诞生

上面的是多线程环境的初始化,完成之后python会开始创建底层平台的原生线程。之前说过,python虽然是对操作系统原生线程的一个抽象,但它和原生线程是一一对应的。下面我们来看看一个子线程是如何被创建的。

//_threadmodule.c
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
    PyObject *func, *args, *keyw = NULL;
    struct bootstate *boot; 
    unsigned long ident;

    ...
    //创建bootstate结构
    boot = PyMem_NEW(struct bootstate, 1);
    if (boot == NULL)
        return PyErr_NoMemory();
    boot->interp = PyThreadState_GET()->interp;
    boot->func = func;
    boot->args = args;
    boot->keyw = keyw;
    boot->tstate = _PyThreadState_Prealloc(boot->interp);
    ...
    //初始化多线程环境
    PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
    //创建线程
    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
    ...
    return PyLong_FromUnsignedLong(ident);
}

//thread_nt.h
unsigned long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
    HANDLE hThread;
    unsigned threadID;
    callobj *obj;
    if (!initialized)
        PyThread_init_thread();

    obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj));
    if (!obj)
        return PYTHREAD_INVALID_THREAD_ID;
    obj->func = func;
    obj->arg = arg;
    PyThreadState *tstate = PyThreadState_GET();
    size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0;
    hThread = (HANDLE)_beginthreadex(0,
                      Py_SAFE_DOWNCAST(stacksize, Py_ssize_t, unsigned int),
                      bootstrap, obj,
                      0, &threadID);
    if (hThread == 0) {
        /* I've seen errno == EAGAIN here, which means "there are
         * too many threads".
         */
        int e = errno;
        dprintf(("%lu: PyThread_start_new_thread failed, errno %d\n",
                 PyThread_get_thread_ident(), e));
        threadID = (unsigned)-1;
        HeapFree(GetProcessHeap(), 0, obj);
    }
    else {
        dprintf(("%lu: PyThread_start_new_thread succeeded: %p\n",
                 PyThread_get_thread_ident(), (void*)hThread));
        CloseHandle(hThread);
    }
    return threadID;
}

主线程通过thread_PyThread_start_new_thread完成创建子线程的工作。而在PyThread_start_new_thread中,我们先看一下函数的参数,这个func和arg其实就是thread_PyThread_start_new_thread里面的t_bootstrapboot,在boot中,保存着线程的相关信息。

而在PyThread_start_new_thread吗,我们看到了obj -> func = func; obj -> arg = arg,说明把func和arg都打包到callobj obj里面去了,我们来看看这个callobj结构体长什么样子

typedef struct {
    void (*func)(void*);
    void *arg;
} callobj;

16.3.2.2 线程状态保护机制

要剖析线程状态的保护机制,我们首先需要回顾一下线程状态。在python中肯定要记录对应线程的状态信息,这个对象就是PyThreadState。

每一个PyThreadState对象中都保存着当前的线程的PyFrameObject、线程id这样的信息,因为这些信息是需要被线程访问的。假设线程A访问线程对象,但是线程对象里面 存储的却是B的id,就完蛋了。因此python内部必须有一套机制,这套机制与操作系统管理进程的机制非常类似。在线程切换的时候,会保存当前线程的上下文,并且还能够进行恢复。在python内部,维护这一个全局变量,当前活动线程所对应的线程状态对象就保存在该变量里。当python调度线程时,会将被激活的线程所对应的线程状态对象赋给这个全局变量,让其始终保存活动线程的状态对象。

但是这样就引入了一个问题:python如何在调度线程时,获得被激活线程对应的状态对象。其实python内部会通过一个单项链表来管理所有的python线程状态对象,当需要寻找一个线程对应的状态对象时,就遍历这个链表,搜索其对应的状态对象。

而对这个状态对象链表的访问,则不必在gil的保护下进行。因为对于这个状态对象链表,python会专门创建一个独立的锁,专职对这个链表进行保护,而且这个锁的创建是在python初始化的时候完成的。

《python解释器源码剖析》第16章--python的多线程机制

16.3.2.3 从gil到字节码解释器

创建线程对象是通过PyThreadState_New函数创建的

//pystate.c
PyThreadState *
PyThreadState_New(PyInterpreterState *interp)
{
    return new_threadstate(interp, 1);
}

//new_threadstate里面会使用到
static struct _frame *
threadstate_getframe(PyThreadState *self)
{
    return self->frame;
}

static PyThreadState *
new_threadstate(PyInterpreterState *interp, int init)
{   
    //创建线程对象
    PyThreadState *tstate = (PyThreadState *)PyMem_RawMalloc(sizeof(PyThreadState));
    
    //用于获取当前线程的frame
    if (_PyThreadState_GetFrame == NULL)
        _PyThreadState_GetFrame = threadstate_getframe;
    
    //下面是线程的相关属性
    if (tstate != NULL) {
        tstate->interp = interp;

        tstate->frame = NULL;
        tstate->recursion_depth = 0;
        tstate->overflowed = 0;
        tstate->recursion_critical = 0;
        tstate->stackcheck_counter = 0;
        tstate->tracing = 0;
        tstate->use_tracing = 0;
        tstate->gilstate_counter = 0;
        tstate->async_exc = NULL;
        tstate->thread_id = PyThread_get_thread_ident();

        tstate->dict = NULL;

        tstate->curexc_type = NULL;
        tstate->curexc_value = NULL;
        tstate->curexc_traceback = NULL;

        tstate->exc_state.exc_type = NULL;
        tstate->exc_state.exc_value = NULL;
        tstate->exc_state.exc_traceback = NULL;
        tstate->exc_state.previous_item = NULL;
        tstate->exc_info = &tstate->exc_state;

        tstate->c_profilefunc = NULL;
        tstate->c_tracefunc = NULL;
        tstate->c_profileobj = NULL;
        tstate->c_traceobj = NULL;

        tstate->trash_delete_nesting = 0;
        tstate->trash_delete_later = NULL;
        tstate->on_delete = NULL;
        tstate->on_delete_data = NULL;

        tstate->coroutine_origin_tracking_depth = 0;

        tstate->coroutine_wrapper = NULL;
        tstate->in_coroutine_wrapper = 0;

        tstate->async_gen_firstiter = NULL;
        tstate->async_gen_finalizer = NULL;

        tstate->context = NULL;
        tstate->context_ver = 1;

        tstate->id = ++interp->tstate_next_unique_id;

        if (init)
            //其它的都是设置属性,我们在前面章节已经见过了
            //之所以又拿出来,是因为关键的这一步
            _PyThreadState_Init(tstate);

        HEAD_LOCK();
        tstate->prev = NULL;
        tstate->next = interp->tstate_head;
        if (tstate->next)
            tstate->next->prev = tstate;
        interp->tstate_head = tstate;
        HEAD_UNLOCK();
    }

    return tstate;
}

//这一步就是表示将线程对应的线程对象放入到我们刚才说的那个"线程状态对象链表"当中
void
_PyThreadState_Init(PyThreadState *tstate)
{
    _PyGILState_NoteThreadState(tstate);
}

这里有一个特别需要注意的地方,就是当前活动的python线程不一定获得了gil。比如主线程获得了gil,但是子线程还没有申请gil,那么操作系统也不会将其挂起。由于主线程和子线程都对应操作系统的原生线程,所以操作系统系统是可能在主线程和子线程之间切换的,因为操作系统级别的线程调度和python级别的线程调度是不同的。当所有的线程都完成了初始化动作之后,操作系统的线程调度和python的线程调度才会统一。那时python的线程调度会迫使当前活动线程释放gil,而这一操作会触发操作系统内核的用于管理线程调度的对象,进而触发操作系统对线程的调度。所以我们说,python对线程的调度是交给操作系统的(使用的是操作系统内核调度线程的调度机制),当操作系统随机选择一个线程的时候,python就会根据这个线程去线程状态对象链表当中找到对应的线程状态对象,并赋值给那个保存当前线程活动状态对象的全局变量。从而开始获取gil,执行字节码,执行一段时间,再次被强迫释放gil,然后操作系统再次调度,选择一个线程,再获取对应的线程状态对象,然后该线程获取gil,执行一段时间字节码,再次被强迫释放gil,然后操作系统再次随机选择,依次往复。。。。。。

显然,当子线程还没有获取gil的时候,相安无事。然而一旦PyThreadState_New之后,多线程机制初始化完成,那么子线程就开始互相争夺话语权了。

//Modules/_threadmodule.c
static void
t_bootstrap(void *boot_raw)
{   
    //线程信息都在里面
    struct bootstate *boot = (struct bootstate *) boot_raw;
    //线程状态对象
    PyThreadState *tstate;
    PyObject *res;
    
    //获取线程状态对象
    tstate = boot->tstate;
    //拿到线程id
    tstate->thread_id = PyThread_get_thread_ident();
    _PyThreadState_Init(tstate);
    //下面说
    PyEval_AcquireThread(tstate);
    //进程的线程数量+1
    tstate->interp->num_threads++;
    //执行字节码
    res = PyObject_Call(boot->func, boot->args, boot->keyw);
    ...
}

这里面有一个PyEval_AcquireThread,之前我们没有说,但如果我要说它是做什么的你就知道了。在PyEval_AcquireThread子线程进行了最后的冲刺,于是在里面它通过PyThread_acquire_lock争取gil。到了这一步,子线程将自己挂起了,操作系统没办法靠自己的力量将其唤醒,只能等待python的线程调度机制强迫主线程放弃gil后,触发操作系统内核的线程调度,子线程才会被唤醒。然而当子线程被唤醒之后,主线程却又陷入了苦苦的等待当中,同样苦苦地守望者python强迫子线程放弃gil的那一刻。(假设我们这里只有一个主线程和一个子线程)

当子线程被python的线程调度机制唤醒之后,它所做的第一件事就是通过PyThreadState_Swap将python维护的当前线程状态对象设置为其自身的状态对象,就如同操作系统进程的上下文环境恢复一样。这个PyThreadState_Swap我们也没有介绍,因为有些东西我们只需要知道是干什么的就行。

子线程获取了gil之后,还不算成功,因为它还没有进入字节码解释器(想象成大大的for循环,里面有一个巨大的switch)。当python线程唤醒子线程之后,子线程将回到t_bootstrap,并进入PyObject_Call,从这里一路往前,最终调用PyEval_EvalFrameEx,才算是成功。因为PyEval_EvalFrameEx执行的是字节码指令,而python最终执行的也是一个字节码文件,所以此时才算是真正的执行(高潮),之前的都只能说是初始化(前戏)。当进入PyEval_EvalFrameEx的那一刻,子线程就和主线程一样,完全受python线程度调度机制控制了。

16.4 python线程调度

16.4.1 标准调度

当主线程和子线程都进入了python解释器后,python的线程之间的切换就完全由python的线程调度机制掌控了。python的线程调度机制肯定是在python解释器核心PyEval_EvalFrameEx里面的,因为线程是在执行字节码的时候切换的,那么肯定是在PyEval_EvalFrameEx里面。而在分析字节码的时候,我们看到过PyEval_EvalFrameEx,尽管说它是字节码执行的核心,但是它实际上调用了其它的函数,但毕竟是从它开始的,所以我们还是说字节码核心是PyEval_EvalFrameEx。总之,在分析字节码的时候,我们并没有看线程的调度机制,那么下面我们就来分析一下。

//ceval.c
PyObject *
PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
{
    PyThreadState *tstate = PyThreadState_GET();
    return tstate->interp->eval_frame(f, throwflag);
}

//pystate.c
PyInterpreterState *
PyInterpreterState_New(void)
{   
    //PyEval_EvalFrameEx调用eval_frame就是_PyEval_EvalFrameDefault
    interp->eval_frame = _PyEval_EvalFrameDefault;
}

//ceval.c
PyObject* _Py_HOT_FUNCTION
_PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag)
{
    for (;;) {
        /* Give another thread a chance */
        if (PyThreadState_Swap(NULL) != tstate)
            Py_FatalError("ceval: tstate mix-up");
        //释放gil,给其他线程一个机会
        drop_gil(tstate);

        /* Other threads may run now */
        //你一旦释放了,那么就必须要再次申请,才能等待下一次被调度。
        take_gil(tstate);       
    }
}

主线程获得了gil执行字节码,但是我们知道在python2中是通过执行字节码数量(_Py_Ticker),每执行一条字节码这个_Py_Ticker将减少1,初始为100。而在python3中,则是通过执行时间来判断的,默认是0.05秒。一旦达到了执行之间,那么主线程就会将维护当前线程状态对象的全局变量设置为NULL并释放掉gil,这时候由于等待gil而被挂起的子线程被操作系统的线程调度机制重新唤醒,从而进入PyEval_EvalFrameEx。而对于主线程,虽然它失去了gil,但是由于它没有被挂起,所以对于操作系统的线程调度机制,它是可以再次被切换为活动线程的。

即使当操作系统的调度机制将主线程切换为活动线程的时候,主线程将主动申请gil,但由于gil被子线程占有,主线程将自身挂起。从这时开始,操作系统就不能再将主线程切换为活动线程了。所以我们发现,线程释放gil并不是马上就被挂起的,而是在释放完之后重新申请gil的时候才被挂起的。然后子线程执行0.05s之后,又会释放gil,申请gil,将自身挂起。而释放gil,会触发操作系统线程调度机制,唤醒主线程,如果是多个子线程的话,那么会从挂起的主线程和其它子线程中随机选择一个恢复。当主线程执行一段时间之后,又给子线程,如此反复,从而实现对python多线程的支持。

16.4.2 阻塞调度

标准调度就是python的调度机制掌控的,每个线程都是相当公平的。但是如果仅仅只有标准调度的话,那么可以说python的多线程没有任何意义,但为什么可以很多场合使用多线程呢?就是因为调度除了标准调度之外,还存在阻塞调度。

阻塞调度是指,当某个线程遇到io阻塞的时候,会主动释放gil,让其它线程执行。因为io是不耗费cpu的,假设time.sleep,或者从网络上请求数据等等,这些都是处于io阻塞。那么我gil,当阻塞的线程可以执行了(如:sleep结束,请求的数据成功返回),那么再切换回来。除了这一种情况之外,还有一种情况,那就是线程不得不挂起,input函数等待用户输入,这个时候也不得不释放gil。

16.5 python子线程的销毁

我们创建一个子线程的时候,往往是执行一个函数,或者重写一个类继承自threading.Thread,当然python的threading模块我们后面会介绍。当一个子线程执行结束之后,python肯定是要把对应的子线程销毁的,当然销毁主线程和销毁子线程是不同的,销毁主线程必须要销毁python的运行时环境。而子线程的销毁则不需要这些动作,因此我们只看子线程的销毁。

通过前面的分析我们知道,线程的主体框架是在t_bootstrap中

//_threadmodule.c
static void
t_bootstrap(void *boot_raw)
{
    struct bootstate *boot = (struct bootstate *) boot_raw;
    PyThreadState *tstate;
    PyObject *res;
    ...
    tstate->interp->num_threads--;
    PyThreadState_Clear(tstate);
    PyThreadState_DeleteCurrent();
    PyThread_exit_thread();
}

python首先会通过PyThreadState_Clear清理当前线程所对应的线程状态对象。所谓清理实际上比较简单,就是改变引用计数。随后,python释放gil,通过PyThreadState_DeleteCurrent

//pystate.c
void
PyThreadState_DeleteCurrent()
{
    PyThreadState *tstate = GET_TSTATE();
    if (tstate == NULL)
        Py_FatalError(
            "PyThreadState_DeleteCurrent: no current tstate");
    tstate_delete_common(tstate);
    if (_PyRuntime.gilstate.autoInterpreterState &&
        PyThread_tss_get(&_PyRuntime.gilstate.autoTSSkey) == tstate)
    {
        PyThread_tss_set(&_PyRuntime.gilstate.autoTSSkey, NULL);
    }
    SET_TSTATE(NULL);
    PyEval_ReleaseLock();
}

然后首先会删除当前的线程状态对象,然后通过PyEval_ReleaseLock释放gil。当然这只是完成了绝大部分的销毁工作,至于剩下的收尾工作就依赖于对应的操作系统了,当然这跟我们也就没关系了。

16.6 python线程的用户级互斥与同步

16.6.1 用户级互斥与同步

我们知道,python的线程在gil的控制之下,线程之间对python提供的c api访问都是互斥的,这可以看做是python的内核级的用户互斥。但是这种互斥不是我们能够控制的,内核级通过gil的实现互斥保护了内核额共享资源,比如del obj,这个是不会被打断的。但是像n += 1这种即便是由gil,由于在执行到一半的时候,碰巧gil释放了,那么也会出岔子。所以我们还需要一种互斥,也就是用户级互斥。

16.6.2 lock

实现用户级互斥的一种方法就是加锁,我们来看看python提供的锁。

static PyMethodDef lock_methods[] = {
    {"acquire_lock", (PyCFunction)lock_PyThread_acquire_lock,
     METH_VARARGS | METH_KEYWORDS, acquire_doc},
    {"acquire",      (PyCFunction)lock_PyThread_acquire_lock,
     METH_VARARGS | METH_KEYWORDS, acquire_doc},
    {"release_lock", (PyCFunction)lock_PyThread_release_lock,
     METH_NOARGS, release_doc},
    {"release",      (PyCFunction)lock_PyThread_release_lock,
     METH_NOARGS, release_doc},
    {"locked_lock",  (PyCFunction)lock_locked_lock,
     METH_NOARGS, locked_doc},
    {"locked",       (PyCFunction)lock_locked_lock,
     METH_NOARGS, locked_doc},
    {"__enter__",    (PyCFunction)lock_PyThread_acquire_lock,
     METH_VARARGS | METH_KEYWORDS, acquire_doc},
    {"__exit__",    (PyCFunction)lock_PyThread_release_lock,
     METH_VARARGS, release_doc},
    {NULL,           NULL}              /* sentinel */
};

这些方法我们肯定都见过,acquire表示上锁、release就是释放。假设有两个线程A和B,A线程执行了lock.acquire(),然后执行下面的代码。这个时候依旧会进行线程调度,线程B执行的时候,也遇到了lock.acquire(),那么不好意思B线程就只能在这里等着了。没错,是轮到B线程执行了,但是由于我们在用户级层面上设置了一把锁lock,而这把锁已经被A线程获取了,那么即使后面切换到B线程,但是在A还没有lock.release()的时候,B也只能卡在lock.acquire()上面。因为A先拿到了锁,那么只要A不释放,B就拿不到锁,从而一直卡在lock.acquire()上面。

用户级互斥,即便能轮到你,但是你无法前进

16.7 python的threading模块

上面说了这么多,那么我们来看看python中的threading模块,下面就是从python层面上介绍这个模块的使用方法、api

16.7.1 创建一个线程

import threading


def hello():
    print("hello world")


# 创建一个线程
t = threading.Thread(target=hello, name="线程1")
"""
target:执行的函数
args:位置参数
kwargs:关键字参数
name:线程名字
daemon:布尔类型。表示是否设置为守护线程。设置为守护线程,那么当主线程执行结束会立即自杀
        默认不是守护线程,表示主线程执行完毕但不会退出,而是等待子线程执行结束才会退出。
"""

# 我们调用threading.Thread会创建一个线程
# 介绍几个简单的属性吧

# 1.拿到线程名字,等价于t.getName()
print(t.name)

# 2.查看是否是守护线程,等价于t.isDaemon()
print(t.daemon)  # False

# 3.线程创建之后,还可以重新设置名字、或者守护线程
t.setName("线程2")
t.setDaemon(True)

print(t.getName())  # 线程2
print(t.isDaemon())  # True

16.7.2 启动线程

import threading

l = []


def hello():
    import time
    time.sleep(1)
    l.append(123)
    print("hello world")


# 创建一个线程
t = threading.Thread(target=hello, name="线程1")

# 线程创建好了,但是我们如何启动呢?
# 直接调用t.start()即可。
t.start()

print(l)
"""
[]
hello world
"""

我启动的一个子线程之后,主线程是不会等待子线程的,而是会继续往下走。因此在子线程进行append之前,主线程就已经打印了。那么如何等待子线程执行完毕之后,再让主线程往下走呢?

import threading

l = []


def hello():
    import time
    time.sleep(1)
    l.append(123)
    print("hello world")


# 创建一个线程
t = threading.Thread(target=hello, name="线程1")
t.start()

# 这里就表示必须等t这个线程执行完毕,主线程才能向下走。
# 当然这里面是可以传递一个超时时间的,如果执行完毕那么主线程往下走
# 但是执行完毕之前,时间到了,主线程也会向下走
t.join()

print(l)
"""
hello world
[123]
"""
# 由于多个线程操作系统调度,所以无法决定谁先打印。
# 但是我们看到确实是等待子线程结束之后才向下走的

# 如果我们没有写t.jojn()
# 那么主线程执行完毕之后,会在最后默认执行一个join,不然它就直接结束了。

# 如果是守护线程的话,那么就不用等了,直接结束
# 如果是多个子线程,同样的逻辑

突然发现这个模块的api实在简单,没啥可介绍的。可以直接网上搜索。

上一篇:为了更好的多线程性能,在对象创建或者更新时,若数据大于2047字节则 Python 的 GIL 会被释放。 执行计算密集型任务如压缩或哈希时释放 GIL


下一篇:python的GIL