简介
本篇文章是介绍一个典型的在线C++服务的最底层socket管理是如何实现的。
文章会从一个最简单的利用socket编程基础API的一个小程序开始,逐步引入现在典型的select,epoll机制,并附上相关demo代码。
socket编程
基于TCP协议的网络程序
TCP协议通讯流程如下图:
最简单的TCP网络程序
服务端:
/*server.c*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define MAXLINE 80
#define SERV_PORT 8000
int main(void) {
struct sockaddr_in servaddr, cliaddr;
socklen_t cliaddr_len;
int listenfd, connfd;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int i, n;
// 第一个系统调用, 建立监听句柄
// 第一个参数, AF_INET代表IPv4, AF_INET6代表IPv6, AF_UNIX代表Unix Domain Socket(本地文件)
// 第二个参数, SOCK_STREAM代表TCP, SOCK_DGRAM代表UDP
listenfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
// 同socket()系统调用第一个参数
servaddr.sin_family = AF_INET;
// 同一台机器可能有多个网卡, 一个网卡也可以绑定多个IP, 代表所有IP都绑定
servaddr.sin_addr.s_addr = htol(INADDR_ANY);
// 端口, 网络协议都是小端序, 要用这个htons系列函数将host编码转为net编码,
// intel机器都是小端, 所以一般都直接返回
servaddr.sin_port = htons(SERV_PORT);
// 第二个系统调用, 将句柄跟对应端口绑定起来
// 第一个参数, 刚刚同构socket建立的句柄
// 第二个&第三个参数, 需要绑定的端口信息
bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
// 开始监听, 20代表如果一个socket还没有被accept走的话, 可以临时挂着等待被处理的状态
listen(listenfd, 20);
printf("Acceptin connections ...\n");
while(1) {
cliaddr_len = sizeof(cliaddr);
// 获取客户端的连接句柄, 如果没链接, 会阻塞等待客户端链接
// 传出客户端句柄, 客户端连接相关信息
connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);
n = read(connfd, buf, MAX_LINE);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));
for (i=0; i<n; ++i) {
buf[i] = toupper(buf[i]);
}
write(connfd, buf, n);
close(connfd);
}
}
客户端:
/* client.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define MAXLINE 80
#define SERV_PORT 8000
int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, n;
char *str;
if (argc != 2) {
fputs("usage: ./client message\n", stderr);
exit(1);
}
str = argv[1];
// 跟服务器一样, 建立socket句柄
sockfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);
// 跟服务器对应的地址和端口号建立连接
// connect()和bind()函数的参数是一样的, 只是connect是连接别人, bind是绑定自己
// 客户端对应的socket不需要分配端口, 内核会自动为该句柄分配端口
connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
// 发送数据
write(sockfd, str, strlen(str));
// 读取数据
n = read(sockfd, buf, MAXLINE);
printf("Response from server:\n");
write(STDOUT_FILENO, buf, n);
close(sockfd);
return 0;
}
简单程序逐步优化
假设在如上的client.c中, 将write到close这一段修改为:
while (fgets(buf, MAXLINE, stdin) != NULL) {
Write(sockfd, buf, strlen(buf));
n = Read(sockfd, buf, MAXLINE);
if (n == 0)
printf("the other side has been closed.\n");
else
Write(STDOUT_FILENO, buf, n);
}
这样企图达到通过命令行交互输入字符串, 并且可以多次跟服务器交互, 但运行下来却发现, 不work, 如下图:
$ ./client
// 第一次输入, 正常返回结果
haha1
HAHA1
// 第二次输入, 无法正常返回结果
haha2
the other side has been closed.
// 第三次输入, 程序自动退出
haha3
$
原因是, 看看server.c
里面针对每个连接的处理, 是应答一次之后就把连接关闭了, 所以发生了如上现象, 那么具体发生了什么呢:
- 在第二次输入的时候, client再次调用该句柄执行write操作, 但是write操作只是把数据写入TCP发送缓冲区就算完事儿, 所以能成功返回不会出错。而server收到该请求之后发现连接已经被关闭, 所以会返回一个RST段, client收到RST段后无法立刻通知应用层, 只是把这个状态保存在TCP协议层。
- 在第三次输入的时候, client再次调用循环给server写数据, 这个时候TCP协议层已经处于RST状态, 知道了这个socket连接的对方已经关闭掉了连接, 所以会发出一个SIGPIPE信号给应用层, 而SIGPIPE信号默认是终止程序, 所以看到上面的现象
在真实线上服务, 因为一些网络异常可能会出现SIGPIPE的信号, 所以我们一般都会在服务端/客户端的程序里加上:
signal (SIGPIPE, SIG_IGN);
来避免被这种异常误杀了程序。
那么, 我们如何才能客户端可以跟服务器端多次交互呢, 一种解决方案如下, 在服务端的处理请求的时候也加一下死循环:
while(1) {
...
accept();
while(1) {
n = read();
if(n == 0) {
break;
}
...
write();
}
close();
}
但是这样的修改会导致, 服务器只能串行处理每个请求, 在上一个客户端进程未终止之前, 另外一个客户端的请求服务器是不能处理的。
那么要达到多个客户端并发处理请求的话, 一种可行的办法是每次请求来了就fork一个进程出来处理这个请求相关的逻辑, 但是这样耗费太大, 于是早些年, 先辈们提出了用select
这种系统调用来解决这个问题。
select
的原理是同时监听多个阻塞的fd(网络/文件都可), 哪个有数据到达了就处理哪个, 这样就不用fork和多进程也能搞定了。
其伪代码大概如下:
listen_fd = socket();
bind();
listen();
// select需要用到的句柄集合
fd_set all_set;
// 将listen_fd加进该集合
FD_SET(listen_fd, &all_set);
while(1) {
// 核心系统调用, 第一个参数是需要监听的所有系统句柄中最大整数值再+1
// 第二个参数是要监听读事件的set
// 第三个参数是要监听写事件的set
// 第四个参数是要监听错误事件的set
// 第五个参数是超时事件, 如果是NULL, 则一直要阻塞到发生事件, 如果是0, 则变成非阻塞函数, 不管是否有变化都立即返回
select(maxfd+1, &all_set, NULL, NULL, NULL);
// 判断该socket是否事件发生
if(FD_ISSET(listen_fd, &all_set)) {
// 有新请求到来
conn_fd = accept(...);
// 将请求连接也加到all_set当中
FD_SET(conn_fd, &all_set);
// 代码省略, 因为select无法返回有事件触发的具体fd, 所以需要将conn_fd加入另外一个数组,
// 假设该数组名为all_clients
...
}
for(i=0; i<max_clients_num; i++) {
if(FD_ISSET(all_clients[i], &all_set)) {
read(...)
write(...)
}
}
}
这样就能做到多个客户端同时跟该服务器打交道, 也能同时得到响应了。
虽然select能满足要求了, 但是先辈们仍然觉得其效率不高, 主要有如下几个原因:
- 每次调用select函数, 就得把装有所有fd的fdset都得从用户态传入内核态, 如果fd较多的时候, 开销会很大
- 每次调用select的时候, 都需要遍历一遍fdset的所有句柄, 这个开销在fd较多的时候也很大
- select支持的文件描述符太小了, 最多只能有1024
于是内核发明了epoll来取代select, 解决如上几个问题, epoll提供了如下几个接口:
- epoll_create: 创建epoll句柄
- epoll_ctl: 将要监听的fd加入epoll
- epoll_wait: 查看epoll中监听的fd的事件
那么他是如何解决如上几个问题的呢
- 因为用户是每次调用epoll_ctl将句柄加入epoll, 这样在内核态自身就保存有所有fd句柄信息了, 不用来回从用户态到内核态了
- epoll内部采用了回调机制, 每次有新事件来的时候就触发对应回调函数, 将句柄加入就绪队列, 这样其实每次epoll_wait就是从就绪队列里读句柄就好
- epoll没有这个限制, 他支持的FD上限就是最大可以打开文件的数目
用epoll来实现服务器端的伪代码大概如下:
listen_fd = socket();
bind();
listen();
// 创建epoll句柄, 告诉内核这个epoll句柄要监听句柄数量
epfd=epoll_create(256);
// epoll需要用到的结构
epoll_event ev,events[20];
// 设置要加入epoll要监听的事件的信息
ev.data.fd=listenfd;
ev.events=EPOLLIN|EPOLLET;
// 将主要的listen_fd加入epoll当中
// 第一个参数是epoll句柄
// 第二个参数是控制指令, 包括增删更新等
// 第三个和第四个参数是要加入epoll监听的句柄信息
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);
while(1) {
// 第一个参数是epoll句柄
// 第二个参数是放有事件的句柄信息
// 第三个参数是每次能处理的事件
// 第四个参数是类似select的超时, -1代表阻塞, 0代表非阻塞
int nfds = epoll_wait(epfd,events,20,-1);
for(int i=0; i<nfds; ++i) {
if(events[i].data.fd == listenfd) {
conn_fd = accept(...);
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
} else(events[i].events & EPOLLIN) {
conn_fd = events[i].data.fd;
read(...)
write(...)
}
}
}
epoll核心的控制核心就在epoll_event.events这个数据结构上, 该字段支持如下值:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT:表示对应的文件描述符可以写
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断
-
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
这里单独说明一下ET模式和LT模式, 默认是LT模式。ET模式就是epoll_wait读到该句柄之后, 应用程序必须立即处理该事件, 即触发后面的读取或者写入操作, 如果不处理的话, 那么下次调用epoll_wait的时候将不会返回该句柄。LT则反之, 如果应用层不处理, 下次依然会告诉应用层。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
在现在实际的线上服务中, 一般都是用的epoll来进行连接管理和事件监听。
但是如上的实例代码中, server端始终都是只有一个主进程在处理客户端的请求, 也就是说服务器处理是串行的, 即使并行请求, 在上一个请求处理完毕之前, 下一个请求是得不到响应的。
所以一般服务器都会采用多线程来处理, 多线程比如上请求会复杂一些, 一般会有一个主线程(监听线程), 多个工作线程。监听线程和工作线程之间通过一个本地队列来同步信息。
当监听线程发现有新的读请求到了之后, 就把该请求放到本地队列中, 多个工作线程就死循环check本地队列, 如果发现本地队列有新请求, 就从里面读取句柄并处理。本地队列处理读取和写入的时候, 需要考虑线程安全的问题。
参考
- Linux C编程一站式学习. http://docs.linuxtone.org/ebooks/C&CPP/c/index.html