libuv 简单使用
来源:https://zhuanlan.zhihu.com/p/50497450
前序:说说为啥要研究libuv,其实在很久之前(大概2年前吧)玩nodejs的时候就对这个核心库非常感兴趣,不过由于当年水平确实比较菜,大概看了看之后实在没能静下心来看下去。18年初的时候,360直播云官网做了React同构,那个时候我问自己如果真有百万并发,每天亿级的访问量有没有信心保证中间node层一次不挂(或者不出任何事故),其实我到今天仍然是没有足够底气的。原因有两个吧:一是对nodejs和它底层的内容还远远不够了解,二是对监控层面做的不够好。我们大概也都知道alinode,他们早在3 4年前就在nodejs上做了很多工作,比如v8内存监控等,但是比较遗憾的是alinode至今没有开源。于是乎有了我的第一篇关于libuv的文章,后面争取还会更新nodejs、v8等相关的内容。 本文从下面几个方面来介绍libuv,通过fs、net两方面介绍libuv的思想。
如何安装、使用libuv这个框架
首先我们可以在libuv上找到libuv这个框架,在README.md里,我们就可以在Build Instructions找到安装方法,作者电脑操作系统是macox(所以后面的实例也是以linux、unix为主,不会讲windows)。我们首先把项目clone到我们的电脑上,在项目根目录执行一下的命令,在执行过程中可能会出现各种底层库没有安装的情况,按照提示自行安装就可以了,作者在执行 xcodebuild 的时候发现不能加上 -target All 的参数,不加的话可以顺利build过去。
$ ./gyp_uv.py -f xcode
$ xcodebuild -ARCHS="x86_64" -project uv.xcodeproj \
-configuration Release -target All
build完成后 我们可以在项目目录里找到 build/Release/libuv.a 文件,这个就是编译后的文件了,我们稍后会用到。 准备工作做好之后我们就可以创建一个C或者C++的工程了,在Mac上我一般使用xcode来编写oc、c、c++的项目。 首先创建一个C项目,这个时候我们需要把我们之前编译的libuv.a的文件加入到项目的依赖中,我们在Build Phases中的 Link Binary with Libraries中添加libuv.a的路径,同时我们需要在项目根目录引入uv.h等文件头。准备工作做好之后,我们就开始学习怎么写标准的hello world了 哈哈哈哈。
#include <stdio.h>
#include <stdlib.h>
#include <uv.h>
int main() {
uv_loop_t *loop = malloc(sizeof(uv_loop_t));
uv_loop_init(loop);
printf("Now quitting.\n");
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
free(loop);
return 0;
}
上述代码仅仅初始化了一个loop循环,并没有执行任何内容,然后就close且退出了。虽然上述代码并没有利用libuv的async功能,但是给我们展示了 uv_loop_init uv_run 两个核心函数。我们稍后会介绍他们做了什么。
先从一个数据结构开始
在开始介绍整个整个libuv之前,我不得不首先介绍一个数据结构,因为这个数据结构在libuv里无处不在,这个数据结构就是--循环双向链表。 我们在项目根目录下的src目录可以找到queue.h的头文件。不错,这个数据结构就是用宏实现的,那我让我们一起来学习一下什么是链表。
链表的定义:
链表是一种物理存储单元上非连续、非顺序的存储结构
那什么是双向链表呢?
双向链表其实就是头尾相连
那什么是双向循环链表呢?
看图我们就明白了,所谓的循环链表就是把头尾相连。
来看一下 queue.h 是怎么实现的
#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Public macros. */
#define QUEUE_DATA(ptr, type, field) \
((type *) ((char *) (ptr) - offsetof(type, field)))
#define QUEUE_INIT(q) \
do { \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
} \
while (0)
上述代码我只截取了部分的实现 其实这里我只想讲两个点 1:QUEUE_NEXT 的实现
(*(QUEUE **) &((*(q))[0]))
在这个宏里,他为什么用这个复杂的方式来实现呢? 其实他有两个目的:强制类型转换、成为左值
*(q))[0]
这个步骤是取到数组的第一个元素
(QUEUE **)
这个步骤进行强制类型转换
(*(nnn) &(xxx))
这个步骤目的就是为了使xxx成为左值
2:QUEUE_DATA 获取链表的值 巧妙的使用了地址的偏移量来完成
来看一个使用queue.h的demo吧
#include "queue.h"
#include <stdio.h>
static QUEUE* q;
static QUEUE queue;
struct user_s {
int age;
char* name;
QUEUE node;
};
int main() {
struct user_s* user;
struct user_s john;
struct user_s henry;
john.name = "john";
john.age = 44;
henry.name = "henry";
henry.age = 32;
QUEUE_INIT(&queue);
QUEUE_INIT(&john.node);
QUEUE_INIT(&henry.node);
QUEUE_INIT(&willy.node);
QUEUE_INIT(&sgy.node);
((*(&queue))[0]) = john.node;
(*(QUEUE **) &((*(&queue))[0])) = &john.node;
QUEUE_INSERT_TAIL(&queue, &john.node);
QUEUE_INSERT_TAIL(&queue, &henry.node);
q = QUEUE_HEAD(&queue);
user = QUEUE_DATA(q, struct user_s, node);
printf("Received first inserted user: %s who is %d.\n",
user->name, user->age);
QUEUE_REMOVE(q);
QUEUE_FOREACH(q, &queue) {
user = QUEUE_DATA(q, struct user_s, node);
printf("Received rest inserted users: %s who is %d.\n",
user->name, user->age);
}
return 0;
}
从上面代码可以总结出5个方法 QUEUE_INIT 队列初始化 QUEUE_INSERT_TAIL 插入到队尾 QUEUE_HEAD 头部第一个元素 QUEUE_DATA 获得元素的内容 QUEUE_REMOVE 从队列中移除元素
那双向循环链表就先简单介绍到这。
libuv的核心
libuv为什么可以这么高效呢?实际他使用了操作系统提供的高并发异步模型
linux: epoll
freebsd: kqueue
windows: iocp
每个我们常见的操作系统都为我们封装了类似的高并发异步模型,那libuv其实就是对各个操作系统进行封装,最后暴露出统一的api供开发者调用,开发者不需要关系底层是什么操作系统,什么API了。 我们来看一下同步模型和异步模型的区别
阻塞模型
我们在一个线程中调用网络请求,之后线程就会被阻塞,直到返回结果才能继续执行线程
异步模型
在异步模型中 我们调用网络请求后不在去直接调用accept阻塞线程,而是轮询fd是否发生变化,在返回内容后我们在调用cb执行我们的代码,这个过程是非阻塞的。 说了这么多我们通过2个例子了解一下其中的原理。
学习如何建立一个socket
我们首先了解一下 C是如何创建socket的,之后我们在看一下如果通过高并发异步模型来创建socket,最后我们在了解一下 libuv下怎么创建socket。
C如何创建一个socket呢?
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/shm.h>
#define MYPORT 8887
#define QUEUE 20
#define BUFFER_SIZE 1024
int main()
{
//定义sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX协议) AF_ROUTE(路由套接字) AF_KEY(秘钥套接字)
// SOCK_STREAM(字节流套接字) SOCK_DGRAM
int server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
///定义sockaddr_in
struct sockaddr_in server_sockaddr;
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(MYPORT);
server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
///bind,成功返回0,出错返回-1
if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1)
{
perror("bind");
exit(1);
}
printf("监听%d端口\n", MYPORT);
///listen,成功返回0,出错返回-1
if(listen(server_sockfd, QUEUE) == -1)
{
perror("listen");
exit(1);
}
///客户端套接字
char buffer[BUFFER_SIZE];
struct sockaddr_in client_addr;
socklen_t length = sizeof(client_addr);
printf("等待客户端连接\n");
///成功返回非负描述字,出错返回-1
int conn = accept(server_sockfd, (struct sockaddr*)&client_addr, &length);
if(conn<0)
{
perror("connect");
exit(1);
}
printf("客户端成功连接\n");
while(1)
{
memset(buffer,0,sizeof(buffer));
long len = recv(conn, buffer, sizeof(buffer), 0);
//客户端发送exit或者异常结束时,退出
;
if(strcmp(buffer,"exit\n")==0 || len<=0) {
printf("出现异常");
break;
}
printf("来自客户端数据:\n");
fwrite(buffer, len, 1, stdout);
send(conn, buffer, len, 0);
printf("发送给客户端数据:\n");
fwrite(buffer, len, 1, stdout);
}
close(conn);
close(server_sockfd);
return 0;
}
代码一大坨,其实上我们简单拆分一下
第一步:创建socket 文件描述符
第二步:定义socket addr
第三步:绑定文件描述符和地址 bind
第四步:监听文件描述符 listen
第五步:等待socket返回内容 accept
第六步:接收信息 recv
那我们如何使用kqueue来创建socket呢?
由于作者电脑是macos,所以只能使用kqueue,不能使用epoll。
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/shm.h>
#define MYPORT 8887
#define QUEUE 20
#define BUFFER_SIZE 1024
int main()
{
// 定义sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX协议) AF_ROUTE(路由套接字) AF_KEY(秘钥套接字)
// SOCK_STREAM(字节流套接字) SOCK_DGRAM
int server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 定义sockaddr_in
struct sockaddr_in server_sockaddr;
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(MYPORT);
server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
// bind,成功返回0,出错返回-1
if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1)
{
perror("bind");
exit(1);
}
printf("监听%d端口\n", MYPORT);
// listen,成功返回0,出错返回-1
if(listen(server_sockfd, QUEUE) == -1)
{
perror("listen");
exit(1);
}
//创建一个消息队列并返回kqueue描述符
int kq = kqueue();
struct kevent change_list; //想要监控的事件
struct kevent event_list[10000]; //用于kevent返回
char buffer[1024];
int nevents;
// 监听sock的读事件
EV_SET(&change_list, server_sockfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
while(1) {
printf("new loop...\n");
// 等待监听事件的发生
nevents = kevent(kq, &change_list, 1, event_list, 2, NULL);
if (nevents < 0) {
printf("kevent error.\n"); // 监听出错
} else if (nevents > 0) {
printf("get events number: %d\n", nevents);
for (int i = 0; i < nevents; ++i) {
printf("loop index: %d\n", i);
struct kevent event = event_list[i]; //监听事件的event数据结构
int clientfd = (int) event.ident; // 监听描述符
// 表示该监听描述符出错
if (event.flags & EV_ERROR) {
close(clientfd);
printf("EV_ERROR: %s\n", strerror(event_list[i].data));
}
// 表示sock有新的连接
if (clientfd == server_sockfd) {
printf("new connection\n");
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int new_fd = accept(server_sockfd, (struct sockaddr *) &client_addr, &client_addr_len);
long len = recv(new_fd, buffer, sizeof(buffer), 0);
char remote[INET_ADDRSTRLEN];
printf("connected with ip: %s, port: %d\n",
inet_ntop(AF_INET, &client_addr.sin_addr, remote, INET_ADDRSTRLEN),
ntohs(client_addr.sin_port));
send(new_fd, buffer, len, 0);
}
}
}
}
return 0;
}
我们可以看到,listen之前都是一样的,不在赘述,简化一下后面的步骤
第一步:创建 kqueue描述符
第二部:监听socket读事件 EV_SET
第三步:绑定kq 和 change_list kevent
一直while循环直到kevent返回可以的文件描述符数量 那到这里其实我们就完全弄懂了 如何直接用C写出高并发异步是怎么运行的。那么我们就看看使用libuv的例子吧
使用libuv的scoket
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#define DEFAULT_PORT 7000
#define DEFAULT_BACKLOG 128
uv_loop_t *loop;
struct sockaddr_in addr;
typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;
void free_write_req(uv_write_t *req) {
write_req_t *wr = (write_req_t*) req;
free(wr->buf.base);
free(wr);
}
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = (char*) malloc(suggested_size);
buf->len = suggested_size;
}
void on_close(uv_handle_t* handle) {
free(handle);
}
void echo_write(uv_write_t *req, int status) {
if (status) {
fprintf(stderr, "Write error %s\n", uv_strerror(status));
}
free_write_req(req);
}
void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
if (nread > 0) {
write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
req->buf = uv_buf_init(buf->base, nread);
fwrite(buf->base, 30, 1, stdout);
uv_write((uv_write_t*) req, client, &req->buf, 1, echo_write);
return;
}
if (nread < 0) {
if (nread != UV_EOF)
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*) client, on_close);
}
free(buf->base);
}
void on_new_connection(uv_stream_t *server, int status) {
if (status < 0) {
fprintf(stderr, "New connection error %s\n", uv_strerror(status));
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
else {
uv_close((uv_handle_t*) client, on_close);
}
}
int main() {
loop = uv_default_loop();
uv_tcp_t server;
uv_tcp_init(loop, &server);
uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
if (r) {
fprintf(stderr, "Listen error %s\n", uv_strerror(r));
return 1;
}
return uv_run(loop, UV_RUN_DEFAULT);
}
实际上整体我们都可以把libuv和我们原生的c kqueue进行一一对应,发现相差不多,唯一不同是我们需要定义 uv_loop 这个内部循环,后面我们在来讲套循环机制。
学习如何进行文件读写
我们学习完了网络,那么我们再来看看文件i/o是怎么处理的。
刚刚我们玩转了socket来看这张图是不是很熟悉?但是发现右侧有了很大的不同。文件操作、DNS、用户代码不是基于epoll这种模型吗? 显而易见我们有了答案,这是为什么呢?其实很简单文件的很多操作就是同步的,但是libuv为了统一异步,利用开辟线程进行文件等操作模拟了异步的过程!!原来我们用了这么久才发现他是个骗子。哈哈!其实是我们学艺不精。 那其实讲到这里文件读写其实讲的差不多了,我们还是来看看例子吧!
#include <stdio.h>
#include <uv.h>
uv_fs_t open_req;
uv_fs_t _read;
static char buffer[1024];
static uv_buf_t iov;
void on_read(uv_fs_t *req) {
printf("%s\n",iov.base);
}
void on_open(uv_fs_t *req) {
printf("%zd\n",req->result);
iov = uv_buf_init(buffer, sizeof(buffer));
uv_fs_read(uv_default_loop(), &_read, (int)req->result,
&iov, 1, -1, on_read);
}
int main() {
const char* path = "/Users/sgy/koa/package.json";
// O_RDONLY 、 O_WRONLY 、 O_RDWR 、 O_CREAT
uv_fs_open(uv_default_loop(), &open_req, path, O_RDONLY, 0, on_open);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
uv_fs_req_cleanup(&open_req);
return 0;
}
其实libuv底层对文件open和read的操作是分开的。 看到这里文件api没啥讲的了,我们来简单讲讲线程池。
线程池
线程池就是对线程的统一管理,预先创建出线程,如果有任务就把任务放到线程池里去执行。
通过上图我们可以看到有任务进来首先会插入到链表中进行排队等待, 直到线程空余就会去链表中去取。 通过阅读 src/threadpool.c文件我们可以了解 MAX_THREADPOOL_SIZE 128 最大线程为128个 default_threads[4] 默认只会开辟4个线程 如果你对底层不了解 那当你在进行大量的文件i/o时 线程池数量就是阻碍你的最大障碍。 为啥最大只能创建128个线程呢?因为大多数操作系统创建一个线程大概花费1M的内存空间,外加用户本身代码也要占用大量的内存,所以这里设置了最大128的限制。
了解libuv的循环机制
我们通过网络和文件了解了libuv,那么我们来看看libuv的循环机制
uv_loop_t *loop;
loop = uv_default_loop()
uv_run(loop, UV_RUN_DEFAULT);
首先我们会创建 loop 然后一系列的骚操作之后 最后我们执行了uv_run 嗯嗯 那uv_run 肯定是突破口了 在src/unix/core.c 文件里 我们找到了 uv_run的定义
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop);
if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
从代码中 我们就可以总结出libuv的运行周期 通过while循环不断的查询 loop中是否有停止符 如果有则退出 否则就不停的进行循环。
上面的图已经清楚的描述我们uv_run的流程了 那其中的核心 就在*uvio_poll* 中 例如在 src/unix/linux-core.c 中的uvio_poll函数 我们就可以找到 我们 epoll 熟悉的身影了。实现逻辑也和我们之前使用过的差不多。
总结
洋洋洒洒写了这么多,最后总结一下也提出自己的思考。 其实libuv底层的 actor模型是非常高效的,很多游戏服务器内核也使用actor模型,那相对于火的不行的go(协程模型) nodejs一直没有在服务端发挥它的高效呢? 我觉得其实原因很简单,因为nodejs他并不高效,我觉得nodejs能够快速的被开发出来并且js运行如此高效 v8功不可没。但是成也v8败也v8,JIT优化的在好 依然和编译型语言相差甚远。 但是一点的性能是阻碍大数据等框架使用go而不是用nodejs的原因吗?我觉得其实并不是,最大的原因我觉得是生态!非常多的Apache开源框架使用java编写,很多大数据使用go来承载,nodejs有什么*生态吗?我觉得并没有,他大多数面向的是前端这个群体导致他的生态的发展。 谢谢大家能看到这里,上述的心得都是近期整理的,如果有不对的地方欢迎大家多多批评。上述内容如果转载请附带原文链接,感谢。
================= End