brpc中的协程bthread源码剖析(一):Work Stealing以及任务的执行与切换

bthread是协程吗?

如果你使用过brpc,那么对bthread应该并不陌生。毫不夸张地说,brpc的精华全在bthread上了。bthread可以理解为“协程”,尽管官方文档的FAQ中,并不称之为协程(见:apache/incubator-brpc)。

若说到pthread大家都不陌生,是POSIX标准中定义的一套线程模型。应用于Unix Like系统,在Linux上pthread API的具体实现是NPTL库实现的。在Linux系统上,其实没有真正的线程,其采用的是LWP(轻量级进程)实现的线程。而bthread是brpc实现的一套“协程”,当然这并不是传统意义上的协程。就像1个进程可以开辟N个线程一样。传统意义上的协程是一个线程中开辟多个协程,也就是通常意义的N:1协程。比如微信开源的libco就是N:1的,libco属于非对称协程,区分caller和callee,而bthread是M:N的“协程”,每个bthread之间的平等的,所谓的M:N是指协程可以在线程间迁移。熟悉Go语言的朋友,应该对goroutine并不陌生,它也是M:N的。

当然准确的说法goroutine也并不等同于协程。不过由于通常也称goroutine为协程,从此种理解上来讲,bthread也可算是协程,只是不是传统意义上的协程!当然,咬文嚼字,没必要。

要实现M:N其中关键就是:工作窃取(Work Stealing)算法。不过在真正展开介绍工作窃取之前,我们先透视一下bthread的组成部分。

bthread的三个T

讲到bthread,首先要讲的三大件:TaskControl、TaskGroup、TaskMeta。以下简称TC、TG、TM。

TaskControl进程内全局唯一。TaskGroup和线程数相当,每个线程(pthread)都有一个TaskGroup,brpc中也将TaskGroup称之为 worker。而TM基本就是表征bthread上下文的真实结构体了。

虽然前面我们说bthread并不严格从属于一个pthread,但是bthread在运行的时候还是需要在一个pthread中的worker中(也即TG)被调用执行的。

从bthread_start_background讲到TM

以下开启Hard模式,我们先从最常见的bthread_start_background来导入。bthread_start_background是我们经常使用的创建bthread任务的函数,源码如下:

 

int bthread_start_background(bthread_t* __restrict tid,
                             const bthread_attr_t* __restrict attr,
                             void * (*fn)(void*),
                             void* __restrict arg) {
    bthread::TaskGroup* g = bthread::tls_task_group;
    if (g) {
        // start from worker
        return g->start_background<false>(tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}

函数接口十分类似于pthread 的pthread_create()。也就是设置bthread的回调函数及其参数。

如果能获取到thread local的TG(tls_task_group),那么直接用这个tg来运行任务:start_background()。

看下start_background源码,代码不少,但可以一眼略过。

template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
    if (__builtin_expect(!fn, 0)) {
        return EINVAL;
    }
    const int64_t start_ns = butil::cpuwide_time_ns();
    const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource(&slot);
    if (__builtin_expect(!m, 0)) {
        return ENOMEM;
    }
    CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
    m->stop = false;
    m->interrupted = false;
    m->about_to_quit = false;
    m->fn = fn;
    m->arg = arg;
    CHECK(m->stack == NULL);
    m->attr = using_attr;
    m->local_storage = LOCAL_STORAGE_INIT;
    m->cpuwide_start_ns = start_ns;
    m->stat = EMPTY_STAT;
    m->tid = make_tid(*m->version_butex, slot);
    *th = m->tid;
    if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
        LOG(INFO) << "Started bthread " << m->tid;
    }
    _control->_nbthreads << 1;
    if (REMOTE) {
        ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
        ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    }
    return 0;
}

主要就是从资源池取出一个TM对象m,然后对他进行初始化,将回调函数fn赋进去,将fn的参数arg赋进去等等。

另外就是调用make_tid,计算出了一个tid,这个tid会作为出参返回,也会被记录到TM对象中。tid可以理解为一个bthread任务的任务ID号。类型是bthread_t,其实bthread_t只是一个uint64_t的整型,make_tid的计算生成逻辑如下:

inline bthread_t make_tid(uint32_t version, butil::ResourceId<TaskMeta> slot) {
    return (((bthread_t)version) << 32) | (bthread_t)slot.value;
}

version是即TM的成员version_butex(uint32_t*)解引用获得的一个版本号。在TM构造函数内被初始化为1,且brpc保证其用不为0。在TG的相关逻辑中会修改这个版本号。

TaskControl

先看TC是如何被创建的,TC对象的直接创建(new和初始化)是在get_or_new_task_control()中,这个函数顾名思义,就是获取TC,有则返之,无则造之。所以TC是进程内全局唯一的,也就是只有一个实例。而TG和TM是一个比一个多。

下面展示一下get_or_new_task_control的被调用链(表示函数的被调用关系),也就能更直观的发现TC是如何被创建的。

  • get_or_new_task_control
    • start_from_non_worker
      • bthread_start_background
      • bthread_start_urgent
    • bthread_timer_add

我们通常用的bthread_start_background()或者定期器bthread_timer_add()都会调用到get_or_new_task_control。

回顾一下bthread_start_background的定义:

int bthread_start_background(bthread_t* __restrict tid,
                             const bthread_attr_t* __restrict attr,
                             void * (*fn)(void*),
                             void* __restrict arg) {
    bthread::TaskGroup* g = bthread::tls_task_group;
    if (g) {
        // start from worker
        return g->start_background<false>(tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}

如果能获取到thread local的TG,那么直接用这个tg来运行任务。如果获取不到TG,则说明当前还没有bthread的上下文(TC、TG都没有),所以调用start_from_non_worker()函数,其中再调用get_or_new_task_control()函数,从而创建出一个TC来。

get_or_new_task_control()的基本逻辑,直接看代码,主要地方我补充了注释:

inline TaskControl* get_or_new_task_control() {
    // 1. 全局变量TC(g_task_control)初始化原子变量
    butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
    // 2.1 通过原子变量进行load,取出TC指针,如果不为空,直接返回
    TaskControl* c = p->load(butil::memory_order_consume);
    if (c != NULL) {
        return c;
    }
    // 2.2. 竞争加自旋锁,重复上一操作
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
    c = p->load(butil::memory_order_consume);
    if (c != NULL) {
        return c;
    }
    
    // 2. 走到这,说明TC确实为NULL,开始new一个
    c = new (std::nothrow) TaskControl;
    if (NULL == c) {
        return NULL;
    }
    // 3. 用并发度concurrency来初始化全局TC
    int concurrency = FLAGS_bthread_min_concurrency > 0 ?
        FLAGS_bthread_min_concurrency :
        FLAGS_bthread_concurrency;
    if (c->init(concurrency) != 0) {
        LOG(ERROR) << "Fail to init g_task_control";
        delete c;
        return NULL;
    }

    // 4. 将全局TC存入原子变量中
    p->store(c, butil::memory_order_release);
    return c;
}

串完上述逻辑,我们来关注一下TC的初始化操作:TaskControl::init()

TaskControl::init()

源码十分简单:

   for (int i = 0; i < _concurrency; ++i) {
        const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
        if (rc) {
            LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
            return -1;
        }
    }

所谓的TC的初始化,就是调用了pthread_create()创建了N个新线程。N就是前面提到的并发度:concurrency。每个pthread线程的回调函数为worker_thread,通过这个函数也便引出了本文真正的主角TaskGroup了。

TaskControl::worker_thread()

毋庸置疑,这是一个static函数(回调函数一般为static)。在这个函数中会创建了一个TaskGroup。去掉一些日志逻辑,我们来看下源码,请大家关注我加的注释部分:

void* TaskControl::worker_thread(void* arg) {
    // 1. TG创建前的处理,里面也是回调g_worker_start_fun函数来执行操作,
    // 可以通过 bthread_set_worker_startfn() 来设置这个回调函数,
    // 实际但是很少用到这个功能
    run_worker_startfn();    

    // 2. 获取TC的指针
    TaskControl* c = static_cast<TaskControl*>(arg);
    // 2.1 创建一个TG
    TaskGroup* g = c->create_group();
    TaskStatistics stat;
    if (NULL == g) {
        LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
        return NULL;
    }

    // 3.1 把thread local的tls_task_group 用刚才创建的TG来初始化
    tls_task_group = g;
    // 3.2 worker计数加1(_nworkers是bvar::Adder<int64_t>类型)
    c->_nworkers << 1;

    // 4. TG运行主任务(死循环)
    g->run_main_task();

    // 5. TG结束时返回状态信息,后面其实有输出stat的日志,这里去掉了
    stat = g->main_stat();
    // ...
  
    // 6. 各种清理操作
    tls_task_group = NULL;
    g->destroy_self();
    c->_nworkers << -1;
    return NULL;
}

通过这个函数,我们的观察视角也就从TC平稳的过渡到TG了。其中TG最主要的函数就是run_main_task()。而你们心心念念的work stealing也不远了。

TaskGroup

TG的主要成员

讲到TG先看TG的主要成员:

    size_t _steal_seed;
    size_t _steal_offset;
    ContextualStack* _main_stack;
    bthread_t _main_tid;
    WorkStealingQueue<bthread_t> _rq;
    RemoteTaskQueue _remote_rq;

每个TG都维护自己一个单独的栈指针:_main_stack和_main_tid。也就是是说TG中有一个特殊的TM。我姑且称之为“主TM”。这两个是在TG初始化的时候赋值的。

每个TG有两个TM的队列,它们之间有啥区别呢?

通过在代码里搜索这两个队列入队的逻辑,可以发现。当调用bthread_start_background()创建bthread任务时,其内部会继续调用TG的ready_to_run(),接着push_rq()函数,给TG的rq入队。而remote_rq队列的入队是是通过执行TG的ready_to_run_remote()完成的。

再看一下ready_to_run_remote注释:

    // Push a bthread into the runqueue from another non-worker thread.
    void ready_to_run_remote(bthread_t tid, bool nosignal = false);

在没有woker(TG)的线程中把bthread入队,只能入到有worder线程中的TG的remote_rq队列。

再看下ready_to_run_remote()的调用的地方。

在butex_wake()中:

    TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, bbw->tid);
    } else {
        bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
    }

start_background()中:

template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
...
...
    if (REMOTE) {
        ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
        ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    }
    return 0;
}

start_background<true>的时候会调用ready_to_run_remote。在start_from_non_worker()中,会start_background<true>。


好了,言归正传。

TaskGroup::run_main_task()

run_main_task(),去掉一些bvar相关的代码,这个函数也异常简洁。

void TaskGroup::run_main_task() {
    ...
    TaskGroup* dummy = this;
    bthread_t tid;
    while (wait_task(&tid)) {
        TaskGroup::sched_to(&dummy, tid);
        DCHECK_EQ(this, dummy);
        DCHECK_EQ(_cur_meta->stack, _main_stack);
        if (_cur_meta->tid != _main_tid) {
            TaskGroup::task_runner(1/*skip remained*/);
        }
        ...
    }
    // Don't forget to add elapse of last wait_task.
    current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}

死循环执行wait_task来等待有效的任务,如果能等到任务,wait_task的出参tid(bthread_t类型)会记录这个任务的ID号。好了,拿到任务ID号tid后,执行sched_to函数来切换栈。在进行了一些check工作后,判断如果当前的tid不是TG的主要tid(main_tid)则执行:TaskGroup::task_runner(1);

三个关键函数:wait_task、sched_to、task_runner。

  1. wait_task:找到一个任务。其中会涉及工作窃取(work stealing)。
  2. sched_to:进行栈、寄存器等运行时上下文的切换,为接下来运行的任务恢复其上下文。
  3. task_runner:执行任务。

现在我们可以通过现在的视角切入到“work stealing”了。


工作窃取(work stealing)

work stealing不是协程的专利,更不是Go语言的专利。work stealing是一种通用的实现负载均衡的算法。这里的负载均衡指的不是像Nginx那种对于外部网络请求做负载均衡,此处指的是每个CPU处理任务时,每个核的负载均衡。不止协程,其实线程池也可以做work stealing。

TaskGroup::wait_task()

// 简化起见,去掉了BTHREAD_DONT_SAVE_PARKING_STATE条件宏判断逻辑相关
bool TaskGroup::wait_task(bthread_t* tid) {
    do {
        if (_last_pl_state.stopped()) {
            return false;
        }
        _pl->wait(_last_pl_state);
        if (steal_task(tid)) {
            return true;
        }
    } while (true);
}

_pl是ParkingLot*类型,_last_plstate是pl中的state。

_pl->wait(_last_pl_state)内部调用的futex做的wait操作,这里可以简单理解为阻塞等待被通知来终止阻塞,当阻塞结束之后,执行steal_task()来进行工作窃取。如果窃取成功则返回。

TaskGoup::steal_task()

    bool steal_task(bthread_t* tid) {
        if (_remote_rq.pop(tid)) {
            return true;
        }
        _last_pl_state = _pl->get_state();
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }

首先TG的remote_rq队列中的任务出队,如果没有则同全局TC来窃取任务。

TC的steal_task实现如下:

TaskControl::steal_task()

bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    // 1: Acquiring fence is paired with releasing fence in _add_group to
    // avoid accessing uninitialized slot of _groups.
    const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
    if (0 == ngroup) {
        return false;
    }

    // NOTE: Don't return inside `for' iteration since we need to update |seed|
    bool stolen = false;
    size_t s = *seed;
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
        TaskGroup* g = _groups[s % ngroup];
        // g is possibly NULL because of concurrent _destroy_group
        if (g) {
            if (g->_rq.steal(tid)) {
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
                stolen = true;
                break;
            }
        }
    }
    *seed = s;
    return stolen;
}

可以看出是随机找一个TG,先从它的rq队列窃取任务,如果失败再从它的remote_rq队列窃取任务。在消费的时候rq比remote_rq有更高的优先级,显而易见,我们一定是想先执行有woker的线程自己push到队列中的bthread,然后再消费其他线程push给自己的bthread。

通过上面三个函数可以看出TaskGroup::wait_task() 在等待任务的时候,是优先获取当前TG的remote_rq,然后是依次窃取其他TG的rq、remote_rq。它并没有从当前TG的rq找任务!这是为什么呢?原因是避免race condition。也就是避免多个TG 等待任务的时候,当前TG从rq取任务,与其他TG过来自己这边窃取任务造成竞态。从而提升一点点的性能。

那么当前TG的rq是什么时候被消费的呢?

在TG的ending_sched()函数中有rq的出队操作,而ending_sched()在task_runner中被调用,task_runner也是run_main_task()的三个关键函数之一。


TaskGroup::task_runner()

void TaskGroup::task_runner(intptr_t skip_remained) {
    TaskGroup* g = tls_task_group;

    if (!skip_remained) {
        while (g->_last_context_remained) {
            RemainedFn fn = g->_last_context_remained;
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            g = tls_task_group;
        }
     ...
     }

在run_main_task()中task_runner()的输入参数是1,所以上面的if逻辑会被跳过。这里忽略这个if,继续向下看,下面是一个很长的do-while循环(去掉一些日志和bvar相关逻辑,补充注释):

    do {
        // Meta and identifier of the task is persistent in this run.
        TaskMeta* const m = g->_cur_meta;
        ... 
        // 执行TM(bthread)中的回调函数
        void* thread_return;
        try {
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        }

        // Group is probably changed
        g = tls_task_group;

        // TODO: Save thread_return
        (void)thread_return;

        ... 日志

        // 清理 线程局部变量(下面是原注释)
        // Clean tls variables, must be done before changing version_butex
        // otherwise another thread just joined this thread may not see side
        // effects of destructing tls variables.
        KeyTable* kt = tls_bls.keytable;
        if (kt != NULL) {
            return_keytable(m->attr.keytable_pool, kt);
            // After deletion: tls may be set during deletion.
            tls_bls.keytable = NULL;
            m->local_storage.keytable = NULL; // optional
        }

        // 累加版本号,且版本号不能为0(下面是原注释)
        // Increase the version and wake up all joiners, if resulting version
        // is 0, change it to 1 to make bthread_t never be 0. Any access
        // or join to the bthread after changing version will be rejected.
        // The spinlock is for visibility of TaskGroup::get_attr.
        {
            BAIDU_SCOPED_LOCK(m->version_lock);
            if (0 == ++*m->version_butex) {
                ++*m->version_butex;
            }
        }
        // 唤醒joiner
        butex_wake_except(m->version_butex, 0);

        // _nbthreads减1(注意_nbthreads不是整型)
        g->_control->_nbthreads << -1;
        g->set_remained(TaskGroup::_release_last_context, m);

        // 查找下一个任务,并切换到其对应的运行时上下文
        ending_sched(&g);

    } while (g->_cur_meta->tid != g->_main_tid); 

do while循环中会执行回调函数,结束的时候会查找下一个任务,并切换上下文。循环的终止条件是tls_task_group的_cur_meta不等于其_main_tid。

ending_sched()中,会有依次从TG的rq、remote_rq取任务,找不到再窃取其他TG的任务,如果都找不到任务,则设置_cur_meta为_main_tid,也就是让task_runner()的循环终止。

然后就会回到run_main_task()的主循环,继续wait_task()等待新任务了。

好了,run_main_task()的三大关键函数,已过其二,还剩下一个sched_to()还未揭开其庐山真面,之所以把它放到最后讲,是因为会涉及一些汇编的知识,读起来可能晦涩艰深,我也没有把握讲好。


TaskGroup::sched_to()

inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
    TaskMeta* next_meta = address_meta(next_tid);
    if (next_meta->stack == NULL) {
        ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
        if (stk) {
            next_meta->set_stack(stk);
        } else {
            // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
            // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
            // This basically means that if we can't allocate stack, run
            // the task in pthread directly.
            next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
            next_meta->set_stack((*pg)->_main_stack);
        }
    }
    // Update now_ns only when wait_task did yield.
    sched_to(pg, next_meta);
}

通过传入的参数:next_tid找到TM:next_meta,和对应的ContextualStack信息:stk。

然后给next_meta设置栈stk。

最后调用另外一个重载的sched_to:

void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta);

源码:

void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
    TaskGroup* g = *pg;

    // Save errno so that errno is bthread-specific.
    const int saved_errno = errno;
    void* saved_unique_user_ptr = tls_unique_user_ptr;

    TaskMeta* const cur_meta = g->_cur_meta;
    const int64_t now = butil::cpuwide_time_ns();
    const int64_t elp_ns = now - g->_last_run_ns;
    g->_last_run_ns = now;
    cur_meta->stat.cputime_ns += elp_ns;
    if (cur_meta->tid != g->main_tid()) {
        g->_cumulated_cputime_ns += elp_ns;
    }
    ++cur_meta->stat.nswitch;
    ++ g->_nswitch;

记录一些数据。继续看代码,判断下一个的TM(next_meta)和当前TM(cur_meta)如果不是同一个,就去切换栈。

    // Switch to the task
    if (__builtin_expect(next_meta != cur_meta, 1)) {
        g->_cur_meta = next_meta;
        // Switch tls_bls
        cur_meta->local_storage = tls_bls;
        tls_bls = next_meta->local_storage;


        if (cur_meta->stack != NULL) {
            if (next_meta->stack != cur_meta->stack) {
                jump_stack(cur_meta->stack, next_meta->stack);
                // probably went to another group, need to assign g again.
                g = tls_task_group;
            }

        }
        // else because of ending_sched(including pthread_task->pthread_task)
    } else {
        LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
    }

tls_bls表示的是TM(bthread)内的局部存储。先做还原,并且赋值成下一个TM的局部存储。接着执行jump_stack()去切换栈。

上面的大if结束之后,去执行TG的remain回调函数(如果设置过)。

    while (g->_last_context_remained) {
        RemainedFn fn = g->_last_context_remained;
        g->_last_context_remained = NULL;
        fn(g->_last_context_remained_arg);
        g = tls_task_group;
    }

    // Restore errno
    errno = saved_errno;
    tls_unique_user_ptr = saved_unique_user_ptr;

    *pg = g;

jump_stack()

src/bthread/stack_inl.h

inline void jump_stack(ContextualStack* from, ContextualStack* to) {
    bthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}

bthread_jump_fcontext()其实是汇编函数,在bthread/context.cpp中,功能就是进行栈上下文的切换(跳转)。与之配套的还有一个bthread_make_fcontext(),负责创建bthread的栈上下文。这两个函数是实现栈上下文切换的核心。它们的代码其实并非brpc的原创,而是出自开源项目libcontext。libcontext是boost::context的简化实现。打开bthread/context.h可以看到版权声明:

/*

    libcontext - a slightly more portable version of boost::context

    Copyright Martin Husemann 2013.
    Copyright Oliver Kowalke 2009.
    Copyright Sergue E. Leontiev 2013.
    Copyright Thomas Sailer 2013.
    Minor modifications by Tomasz Wlostowski 2016.

 Distributed under the Boost Software License, Version 1.0.
      (See accompanying file LICENSE_1_0.txt or copy at
            http://www.boost.org/LICENSE_1_0.txt)

*/

其实另外一个C++协程的开源项目libgo中的Context也脱胎于此。

在context.cpp中,定义了各种平台的bthread_jump_fcontext() /bthread_make_fcontext()实现。__asm代码块是C语言文件中编写汇编语言代码的写法。

#if defined(BTHREAD_CONTEXT_PLATFORM_linux_x86_64) && defined(BTHREAD_CONTEXT_COMPILER_gcc)
__asm (
".text\n"
".globl bthread_jump_fcontext\n"
".type bthread_jump_fcontext,@function\n"
".align 16\n"
"bthread_jump_fcontext:\n"
"    pushq  %rbp  \n"
"    pushq  %rbx  \n"
"    pushq  %r15  \n"
"    pushq  %r14  \n"
"    pushq  %r13  \n"
"    pushq  %r12  \n"
"    leaq  -0x8(%rsp), %rsp\n"
"    cmp  $0, %rcx\n"
"    je  1f\n"
"    stmxcsr  (%rsp)\n"
"    fnstcw   0x4(%rsp)\n"
"1:\n"
"    movq  %rsp, (%rdi)\n"
"    movq  %rsi, %rsp\n"
"    cmp  $0, %rcx\n"
"    je  2f\n"
"    ldmxcsr  (%rsp)\n"
"    fldcw  0x4(%rsp)\n"
"2:\n"
"    leaq  0x8(%rsp), %rsp\n"
"    popq  %r12  \n"
"    popq  %r13  \n"
"    popq  %r14  \n"
"    popq  %r15  \n"
"    popq  %rbx  \n"
"    popq  %rbp  \n"
"    popq  %r8\n"
"    movq  %rdx, %rax\n"
"    movq  %rdx, %rdi\n"
"    jmp  *%r8\n"
".size bthread_jump_fcontext,.-bthread_jump_fcontext\n"
".section .note.GNU-stack,\"\",%progbits\n"
);

这里的汇编是AT&T汇编,和Intel汇编语法不通。比如这里的mov操作,在从左到右看的。movq和popq的q表示操作的单位是四字(64位),如果是32位系统,则是movl和popl了。

    pushq  %rbp
    pushq  %rbx
    pushq  %r15
    pushq  %r14
    pushq  %r13
    pushq  %r12

常规操作,就是把函数调用方的相关寄存器入栈,也就是保存调用方的运行环境。在当前函数执行结束之后要从栈中还原数据到相应的寄存器中,从而让调用方继续执行。所以末尾有出栈操作。

在入栈之后:

leaq  -0x8(%rsp), %rsp

表示:rsp 栈顶寄存器下移 8 字节,为FPU 浮点运算预留。

另外值得一提的是bthread_jump_fcontext()函数在调用的时候是传入了3个参数,但是定义的bthread_jump_fcontext()是可以接收4个参数的。也正是因为这个第4个参数,导致了代码里有了2次跳转,分别跳转到1和2处。

先看一下函数参数和寄存器的关系:

%rdi 第1个参数
%rsi 第2个参数
%rdx 第3个参数
%rcx 第4个参数

在leaq指令之后,开始判断第四个参数的值。

    cmp  $0, %rcx
    je  1f
    stmxcsr  (%rsp)    // 保存当前MXCSR内容到rsp指向的位置
    fnstcw   0x4(%rsp) // 保存当前FPU状态字到rsp+4指向的位置
1:

如果第四个参数为0则直接跳转到1处。也就是跳过stmxcsr、fnstcw这两个指令。对于我们的场景而言,没有第四个参数也就不需要管这个。继续:

1:
    movq  %rsp, (%rdi)
    movq  %rsi, %rsp

我们知道%rdi和%rsi表示的是第一个参数和第二个参数,也就是:&from->context 和 to->context。

这两个movq指令表示的就是栈切换的核心操作,将当前的栈指针(%rsp)存储到第一个参数所指向的内存中。然后将第二个参数的值赋值给栈指针。修改栈指针,就是更改了栈顶,也就是进行了实际的栈切换操作。

接着是不太重要的代码,还是和第四个参数有关的:

    cmp  $0, %rcx
    je  2f
    ldmxcsr  (%rsp)
    fldcw  0x4(%rsp)
2:

也就是说如果第4个参数是0,则跳转到2。跳过的两条指令ldmxcsr、fldcw可以理解为是之前stmxcsr、fnstcw那两个指令的逆操作(也就是还原一下)。

2:
    leaq  0x8(%rsp), %rsp

%rsp 栈顶寄存器上移 8 字节,恢复为 FPU 浮点运算预留空间。

接着还原从栈中各个寄存器,因为是栈,所以逆向出栈。

    popq  %r12
    popq  %r13
    popq  %r14
    popq  %r15
    popq  %rbx
    popq  %rbp

在这6个popq之后还有一个popq,和前面的pushq是没有对应关系的。

    popq  %r8

是将bthread_jump_fcontext()之后该执行的指令地址,放到 %r8 寄存器中。展开一下谈谈,比如在函数A调用函数B的时候,会先把函数的返回值入栈,然后再把函数B的参数入栈。所以对应逆操作,在函数参数都出栈之后,继续出栈的数据就是函数的返回地址!

    movq  %rdx, %rax
    movq  %rdx, %rdi

%rdx表示的是函数的第三个参数,也就是是否:skip remained,当前都是0。先后存入到%rax和%rdi中。

%rax寄存器表示的是返回值。

%rdi表示的是函数第一个参数。也就是给切换完栈之后要调用的函数,准备参数。

    jmp  *%r8

跳转到返回地址,即调用方在调用完bthread_jump_fcontext()后,继续执行的指令位置。


好了。虽然洋洋洒洒几千字,其实也只能“管中窥豹,只见一斑”。不过如果本文能带你了解bthread基本的运行和调度原理,于我而言也已足够。关于bthread还有很多实现细节和相关技术没有展开,以后我会再继续剖析。brpc的源码值得我们反复阅读。

我是果冻,欢迎关注我。

公众号:编程往事

brpc中的协程bthread源码剖析(一):Work Stealing以及任务的执行与切换

 

上一篇:【Rust日报】 2019-08-01:brpc-rs - X-lab 实验室新推出的一个rpc库


下一篇:brpc中资源池/对象池的源码剖析