以swoole为例,学习如何实现协程

聊聊Swoole2.0协程

Swoole 2.0正式版发布了。2.0版本最大的更新是增加了对协程(Coroutine)的支持。正式版已同时支持PHP5和PHP7。基于Swoole2.0协程PHP开发者可以已同步的方式编写代码,底层自动进行协程调度,转变为异步IO。解决了传统异步编程嵌套回调的问题。

目前Swoole底层内置的协程客户端组件包括:udpclient、tcpclient、httpclient、redisclient、mysqlclient,基本涵盖了开发者常用的几种通信协议。协程组件只能在服务器的onConnect、onRequest、onReceive、onMessage 回调函数中使用。

注意,Swoole 2.0.5以前的版本还是灰度测试版本,可能会存在问题。 beta是因为协程是全新的版本。


协程的使用示例

/**
    只有在Server中才能使用协程。包括 http server,websocket server 和 server。
*/
$server = new Swoole\Http\Server('127.0.0.1', 9501);

/**
    触发on request事件时,SWOOLE会开辟一个协程栈,对协程栈进行初始化
 */
$server->on('Request', function ($request, $response) {
    $tcp_cli = new Swoole\Coroutine\Client(SWOOLE_SOCK_TCP);
    /**
        client在调用connect函数后,SWOOLE会将PHP上下文信息保存到当前栈内
        然后将协程挂起,待确认连接成功后,触发epoll事件,然后协程切换
        恢复PHP上下文信息,返回结果,继续执行PHP代码
     */
    if ($tcp_cli->connect('127.0.0.1', 9906) === false) {
        $response->end("connect server failed.");
        return;
    }
    $tcp_cli->send('test for the coro');
    /**
        client在调用recv函数后,SWOOLE会将PHP上下文信息保存到当前栈内
        然后将协程挂起待后端svr回包,触发epoll事件,然后协程切换
        恢复PHP上下文信息,返回结果,继续执行PHP代码
        如果后端在设定的超时时间内,未能回包,返回false
        client的errCode定为110
     */
    $ret = $tcp_cli->recv(100);
    $tcp_cli->close();
    if ($ret) {
        $response->end(" swoole response is ok");
    } else {
        $response->end(" recv failed error : {$tcp_cli->errCode}");
    }
});

$server->start();

协程的执行流程

主进程->回调函数: 嘿,有数据到了,麻烦你处理下
Note right of 回调函数: 遇到异步IO
回调函数-->主进程: 已保存状态,你执行其他任务吧
Note left of 主进程: 去执行其他任务
主进程->回调函数: 发送的数据,有消息返回啦,你处理下

咱们就以上面的示例代码为例,说一说协程的执行流程。
Http Server监听9051端口。当有相关事件发生时,如有数据到达,就会执行绑定到Request上的回调函数。在执行回调函数之前,会创建一个协程。这时,会保存CPU寄存器的状态和ZendVM Stack信息。
在回调函数执行过程中,如果遇到IO操作,如$tcp_cli->connect(,就会保存当前的状态,并让出CPU使用权。当前请求执行被挂起。
让CPU出使用权后,CPU就可以用于处理其他事件。如处理其他客户端的Request请求。
当被挂起的请求,又有新的事件发生,如上面$tcp_cli->connect()的数据已经返回。这时,会使用挂起前保存的状态信息恢复,然后继续执行回调函数。
如果在执行过程中,再次遇到IO操作,会继续执行保存状态和让出CPU使用权。

协程的意义

这些IO操作都是非阻塞的,即发送请求和获取数据分为两步。当请求发送完毕后,就会进行状态保存和让出CPU使用权。在等待请求数据返的这段时间,CPU可以执行一些其他程序。这样就可以充分利用CPU。

协程的实现

Swoole的协程是基于 setjmp 、 longjmp 实现的。Swoole为每个协程都分配了空间,用于保存协程切换时的状态信息。进行协程切换时会自动保存Zend VM的内存状态(主要是EG全局内存和vm stack)。当回调函数执行完毕后,会自动销毁分配的空间。

创建协程

什么时候会创建协程?在Server的onConnect、onRequest、onReceive、onMessage 回调函数被执行前会创建一个协程。
协程创建的方法是coro_create。相关源码可以查看swoole_coroutine.c文件。
coro_create方法中主要进行了如下操作:

int sw_coro_create(zend_fcall_info_cache *fci_cache, zval **argv, int argc, zval **retval, void *post_callback, void* params)
{
            // 为回调函数的执行做一些准备工作
            .......
        COROG.require = 1;
    // 使用setjmp开启一个协程
    if (!setjmp(*swReactorCheckPoint))
    {
            // setjmp第一次调用会进入此代码分支,执行回调函数
        zend_execute_ex(execute_data TSRMLS_CC);
        ......
        // 执行完毕后,关闭协程
        coro_close(TSRMLS_C);
        ......
        coro_status = CORO_END;
    }
    else
    {
            /**
             如果执行longjump,会调到上面的setjmp(*swReactorCheckPoint)行。
             但是,setjmp的返回值为非0。因此,longjump后,会进入此代码分支。
             让出CPU执行权。
             */
        coro_status = CORO_YIELD;
    }
    COROG.require = 0;

    return coro_status;
}

协程让出CPU执行权yield

什么时候会让出CPU执行权?当回调函数中遇到异步IO的时候,会让出CPU执行权。如,代码中的connect操作。下面,我们就以connect操作为例,看看让出CPU执行权时都做了那些操作。
connect的相关代码在swoole_coroutine.c文件中。代码如下:

static PHP_METHOD(swoole_client_coro, connect)
{
    long port = 0, sock_flag = 0;
    ......
    //nonblock async
    // 发送连接数据,无需等待对方返回数据,就执行下面代码
    if (cli->connect(cli, host, port, timeout, sock_flag) < 0)
    {
       ......
    }
    ......
    // 获取一个内存空间,用于保存当前执行的上下文信息。
    php_context *sw_current_context = swoole_get_property(getThis(), 0);
    ......
    // 保存协程信息
    coro_save(sw_current_context);
    // 让出CPU使用权
    coro_yield();
}

保存协程信息

所谓的协程信息主要就是当前的上下文执行信息。coro_save方法在swoole_coroutine.c文件中。代码如下:

sw_inline php_context *sw_coro_save(zval *return_value, php_context *sw_current_context)
{
    // 下面的代码主要是把当前的执行状态保存到之前获取的内存空间中
    zend_execute_data *current = EG(current_execute_data);
    if (ZEND_CALL_INFO(current) & ZEND_CALL_RELEASE_THIS)
    {
        zval_ptr_dtor(&(current->This));
    }
    zend_vm_stack_free_args(EG(current_execute_data));
    zend_vm_stack_free_call_frame(EG(current_execute_data));

    strncpy(SWCC(uid), COROG.uid, 20);
    SWCC(current_coro_return_value_ptr) = return_value;
    SWCC(current_execute_data) = EG(current_execute_data)->prev_execute_data;
    SWCC(current_vm_stack) = EG(vm_stack);
    SWCC(current_vm_stack_top) = EG(vm_stack_top);
    SWCC(current_vm_stack_end) = EG(vm_stack_end);
    SWCC(current_task) = COROG.current_coro;
    SWCC(allocated_return_value_ptr) = COROG.allocated_return_value_ptr;

    return sw_current_context;

}

让出CPU执行权

coro_yield方法的作用是让出CPU执行权。代码在swoole_coroutine.c文件中。

sw_inline void coro_yield()
{
    SWOOLE_GET_TSRMLS;
// 还原栈信息
#if PHP_MAJOR_VERSION >= 7
    EG(vm_stack) = COROG.origin_vm_stack;
    EG(vm_stack_top) = COROG.origin_vm_stack_top;
    EG(vm_stack_end) = COROG.origin_vm_stack_end;
#else
    EG(argument_stack) = COROG.origin_vm_stack;
    EG(current_execute_data) = COROG.origin_ex;
#endif
      // 跳转到coro_create方法中setjmp代码行。
    longjmp(*swReactorCheckPoint, 1);
}

在这个方法中主要进行了还原栈信息和longjump操作。
COROG.origin_vm_stack 这些栈信息的初始化在coro_init方法中。记录了协程执行前的状态。

恢复协程

当异步IO有数据返回后,会进行协程恢复。协程恢复的方法是coro_resume。在swoole_coroutine.c文件中。代码如下:

int sw_coro_resume(php_context *sw_current_context, zval *retval, zval *coro_retval)
{
    // 使用之前保存的协程信息恢复执行上下文环境。
    EG(vm_stack) = SWCC(current_vm_stack);
    ....
    int coro_status;
    // 设置跳转点,方便在执行过程中再遇到异步IO操作,进行跳转。
    if (!setjmp(*swReactorCheckPoint))
    {
        //coro exit
        // 继续执行回调函数
        zend_execute_ex(sw_current_context->current_execute_data TSRMLS_CC);
        coro_close(TSRMLS_C);
        coro_status = CORO_END;
    }
    else
    {
        //coro yield
        coro_status = CORO_YIELD;
    }

    if (unlikely(coro_status == CORO_END && EG(exception)))
    {
        sw_zval_ptr_dtor(&retval);
        zend_exception_error(EG(exception), E_ERROR TSRMLS_CC);
    }
    return coro_status;
}

可见,创建协程和恢复协程的整体代码结构差不多。

结束协程

当回到函数执行完毕后,会结束协程。
coro_close方法用于结束协程。源码在swoole_coroutine.c文件中。

sw_inline void coro_close(TSRMLS_D)
{
      // 释放为协程而申请的相关资源
    efree(EG(vm_stack));
    efree(COROG.allocated_return_value_ptr);
    // 恢复执行栈
    EG(vm_stack) = COROG.origin_vm_stack;
    EG(vm_stack_top) = COROG.origin_vm_stack_top;
    EG(vm_stack_end) = COROG.origin_vm_stack_end;
    --COROG.coro_num;
    swTrace("closing coro and %d remained. usage size: %zu. malloc size: %zu", COROG.coro_num, zend_memory_usage(0), zend_memory_usage(1));
}
上一篇:JAVA Appliation下取得资源文件的路径


下一篇:LVM学习之逻辑卷LV及卷组扩容VG