文章目录
一、项目简介
二、项目整体认识
2、1 HTTP服务器
2、2 Reactor模型
三、预备知识
3、1 C++11 中的 bind
3、2 简单的秒级定时任务实现
3、3 正则库的简单使用
3、4 通用类型any类型的实现
四、服务器功能模块划分与实现
4、1 Buffer模块
4、2 Socket模块
4、3 Channel模块
4、4 Poller模块
4、5 Eventloop模块
4、5、1 时间轮思想
4、5、2 TimerWheel定时器模块整合
4、5、3 Channel 与 EventLoop整合
4、5、3 时间轮与EventLoop整合
4、6 Connection模块
4、7 Acceptor模块
4、8 LoopThread模块
4、9 LoopThreadPool模块
4、10 TcpServer模块
4、11 测试代码
五、HTTP协议支持实现
5、1 Util模块
5、2 HttpRequest模块
5、3 HttpResponse模块
5、4 HttpContext模块
5、5 HttpServer模块
六、对服务器进行测试
6、1 长连接测试
6、2 不完整报文请求
6、3 业务处理超时测试
6、4 一次发送多条数据测试
6、5 大文件传输测试
6、6 性能测试
????♂️ 作者:@Ggggggtm ????♂️
???? 专栏:实战项目????
???? 标题: 仿muduo库实现one thread one loop式并发服务器 ????
❣️ 寄语:与其忙着诉苦,不如低头赶路,奋路前行,终将遇到一番好风景 ❣️
一、项目简介
该项目目标是实现一个高并发的服务器。但并不是自己完全实现一个,而是仿照现有成熟的技术进行模拟实现。
一些必备知识:线程、网络套接字编程、多路转接技术(epoll),另外还有一些小的知识,在本篇文章中会提前讲解。
本项目主要分为多个模块来进行讲解,实际上就是一个个小的组件。通过这些组件,我们可以很快的搭建起来一个高并发式的服务器。
二、项目整体认识
2、1 HTTP服务器
该项目组件内提供的不同应用层协议支持,由于应用层协议有很多,我们就在项目中提供较为常见的HTTP协议组件支持。
HTTP(Hyper Text Transfer Protocol),超⽂本传输协议是应⽤层协议,是⼀种简单的请求-响应协议(客户端根据自己的需要向服务器发送请求,服务器针对请求提供服务,完毕后通信结束)。
但是需要注意的是HTTP协议是⼀个运行在TCP协议之上的应用层协议,这⼀点本质上是告诉我们,HTTP服务器其实就是个TCP服务器,只不过在应用层基于HTTP协议格式进行数据的组织和解析来明确客⼾端的请求并完成业务处理。因此实现HTTP服务器简单理解,只需要以下几步即可:
- 搭建⼀个TCP服务器,接收客户端请求。
- 以HTTP协议格式进行解析请求数据,明确客户端目的。
- 明确客户端请求目的后提供对应服务。
- 将服务结果⼀HTTP协议格式进行组织,发送给客户端实现⼀个HTTP服务器很简单,但是实现⼀个高性能的服务器并不简单,这个单元中将讲解基于Reactor模式的高性能服务器实现。
2、2 Reactor模型
Reactor模式,是指通过⼀个或多个输入同时传递给服务器进行请求处理时的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫Dispatcher模式。简单理解就是使用 I/O 多路复用 统⼀监听事件(Reactor 模式就是基于IO多路复用构建起来的),收到事件后分发给处理进程或线程,是编写高性能网络服务器的必备技术之⼀。
网络模型演化过程中,将建立连接、IO等待/读写以及事件转发等操作分阶段处理,然后可以对不同阶段采用相应的优化策略来提高性能;也正是如此,Reactor 模型在不同阶段都有相关的优化策略,常见的有以下三种方式呈现:
- 单Reactor单线程模型:单I/O多路复用+业务处理;
- 单Reactor多线程模型:单I/O多路复用+线程池;
- 多Reactor多线程模型:多I/O多路复用+线程池。
下面我们来具体分析一下其优缺点。
单Reactor单线程:在单个线程中进行事件监控并处理。具体步骤如下:
- 通过IO多路复用模型进行客户端请求监控。
- 触发事件后,进行事件处理。
- 如果是新建连接请求,则获取新建连接,并添加至多路复用模型进行事件监控。如果是数据通信请求,则进行对应数据处理(接收数据,处理数据,发送响应)。
- 优点:所有操作均在同⼀线程中完成,思想流程较为简单,不涉及进程/线程间通信及资源争抢问题。
- 缺点:无法有效利用CPU多核资源,很容易达到性能瓶颈。
- 适用场景:适用于客户端数量较少,且处理速度较为快速的场景。(处理较慢或活跃连接较多,会导致串行处理的情况下,后处理的连接长时间无法得到响应)
单Reactor多线程:一个Reactor进行时间监控,由多个线程(线程池)来处理就绪事件。
- Reactor线程通过I/O多路复用模型进行客户端请求监控;
- 触发事件后,进行事件处理
- 如果是新建连接请求,则获取新建连接,并添加至多路复用模型进行事件监控。
- 如果是数据通信请求,则接收数据后分发给Worker线程池进行业务处理。
- 工作线程处理完毕后,将响应交给Reactor线程进行数据响应。
其优缺点如下:
- 优点:充分利用CPU多核资源
- 缺点:多线程间的数据共享访问控制较为复杂,单个Reactor 承担所有事件的监听和响应,在单线程中运行,高并发场景下容易成为性能瓶颈。
多Reactor多线程:多I/O多路复用进行时间监控,同时使用线程池来对就绪时间进行处理。
- 在主Reactor中处理新连接请求事件,有新连接到来则分发到子Reactor中监控
- 在子Reactor中进行客户端通信监控,有事件触发,则接收数据分发给Worker线程池
- Worker线程池分配独立的线程进行具体的业务处理
- 工作线程处理完毕后,将响应交给子Reactor线程进行数据响应。
优点:充分利用CPU多核资源,主从Reactor各司其职。但是大家也要理解:执行流并不是越多越好,因为执行流多了,反而会增加cpu切换调度的成本。
目标定位-One Thread One Loop主从Reactor模型高并发服务器。
咱们要实现的是主从Reactor模型服务器,也就是主Reactor线程仅仅监控监听描述符,获取新建连接,保证获取新连接的高效性,提高服务器的并发性能。主Reactor获取到新连接后分发给子Reactor进行通信事件监控。而子Reactor线程监控各自的描述符的读写事件进行数据读写以及业务处理。
One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进行,⼀个线程对应⼀个事件处理的循环。
当前实现中,因为并不确定组件使用者的使用意向,因此并不提供业务层工作线程池的实现,只实现主从Reactor,而Worker工作线程池,可由组件库的使用者的需要自行决定是否使用和实现。
对比上个模型,One Thread One Loop主从Reactor模型高并发服务器结构图如下:
三、预备知识
3、1 C++11 中的 bind
bind也是一种函数包装器,也叫做适配器。它可以接受一个可调用对象,以及函数的各项参数,然后返回⼀个新的函数对象,但是这个函数对象的参数已经被绑定为设置的参数。运⾏的时候相当于总是调用传入固定参数的原函数。
调用bind的一般形式为:auto newCallable = bind(callable, arg_list);
解释说明:
- callable:需要包装的可调用对象。
- newCallable:生成的新的可调用对象。
- arg_list:逗号分隔的参数列表,对应给定的callable的参数。当调用newCallable时,newCallable会调用callable,并传给它arg_list中的参数。
arg_list中的参数可能包含形如_n的名字,其中n是一个整数,这些参数是“占位符”,表示newCallable的参数,它们占据了传递给newCallable的参数的“位置”。数值n表示生成的可调用对象中参数的位置,比如_1为newCallable的第一个参数,_2为第二个参数,以此类推。
此外,除了用auto接收包装后的可调用对象,也可以用function类型指明返回值和形参类型后接收包装后的可调用对象。当然,arg_list中的参数也可以绑定固定的值。下面我们来结合几个例子理解一下。
绑定固定值如下:
int Plus(int a, int b) { return a + b; } int main() { //绑定固定参数 function<int()> func = bind(Plus, 10, 10); cout << func() << endl; return 0; }
在上述代码中,func()相当于调用了Plus(10,10)。因为我们绑定了固定的两个参数值,所以直接调用即可。接下来我们再看一下使用展位符进行绑定。代码如下:
int Plus(int a, int b) { return a + b; } int main() { //绑定固定参数 function<int(int)> func = bind(Plus, placeholders::_1, 10); cout << func(2) << endl; //12 return 0; }
这里的 placeholders::_1 就是一个占位符,相当于func中传入的第一个参数。
上述的场景并不使用,一般情况我们会在对类内的成员函数进行绑定,因为在类外调用类内成员函数时,由于类内的成员函数第一个参数是都是this指针,所以很不方便调用,于是我们可以绑定一个this指针,或者匿名对象都是可以的,这样就可以正常的进行调用了。结合如下例子理解一下:
class Sub { public: int sub(int a, int b) { return a - b; } }; int main() { //绑定固定参数 function<int(int, int)> func = bind(&Sub::sub, Sub(), placeholders::_1, placeholders::_2); cout << func(1, 2) << endl; //-1 return 0; }
还有一种场景,bind函数有个好处就是,这种任务池在设计的时候,不⽤考虑都有哪些任务处理方式了,处理函数该如何设计,有多少个什么样的参数,这些都不用考虑了,降低了代码之间的耦合度。代码如下:
#include <iostream> #include <string> #include <vector> #include <functional> void print(const std::string &str) { std::cout << str << std::endl; } int main() { using Functor = std::function<void()>; std::vector<Functor> task_pool; task_pool.push_back(std::bind(print, "你好")); task_pool.push_back(std::bind(print, "我是")); task_pool.push_back(std::bind(print, "Ggggggtm")); for (auto &functor : task_pool) { functor(); } return 0; }
在上述代码中,print函数就是我们要执行的任务,当然还可以有其他的函数。如果没有bind,那么处理各种不同参数的函数是很麻烦的,而这里我们只需要一个bind函数,可以将他们同意看成无参的函数。
3、2 简单的秒级定时任务实现
在当前的⾼并发服务器中,我们不得不考虑⼀个问题,那就是连接的超时关闭问题。我们需要避免⼀个连接⻓时间不通信,但是也不关闭,空耗资源的情况。这时候我们就需要⼀个定时任务,定时的将超时过期的连接进⾏释放。Linux中给我们提供了定时器,代码如下:#include <sys/timerfd.h> int timerfd_create(int clockid, int flags); //clockid : CLOCK_REALTIME - 系统实时时间,如果修改了系统时间就会出问题; CLOCK_MONOTONIC - 从开机到现在的时间是⼀种相对时间; flags : 0 - 默认阻塞属性 int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old); //fd : timerfd_create返回的⽂件描述符 // flags : 0 - // 相对时间, 1 - 绝对时间;默认设置为0即可.new: ⽤于设置定时器的新超时时间 old: ⽤于接 收原来的超时时间 struct timespec { time_t tv_sec; /* Seconds */ long tv_nsec; /* Nanoseconds */ }; struct itimerspec { struct timespec it_interval; /* 第⼀次之后的超时间隔时间 */ struct timespec it_value; /* 第⼀次超时时间 */ }; // 定时器会在每次超时时,⾃动给fd中写⼊8字节的数据,表⽰在上⼀次读取数据到当前读取数据期间超 // 时了多少次。
下面我们来结合一个实际的例子来看一下。具体如下:
#include <iostream> #include <stdio.h> #include <errno.h> #include <sys/timerfd.h> #include <unistd.h> int main() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if(timerfd < 0) { perror("timerfd_create error"); exit(2); } struct itimerspec itm; itm.it_value.tv_sec = 3; itm.it_value.tv_nsec = 0; itm.it_interval.tv_sec = 3; itm.it_interval.tv_nsec = 0; timerfd_settime(timerfd, 0, &itm, nullptr); while(true) { uint64_t tmp; int n = read(timerfd, &tmp, sizeof tmp); if(n < 0) { perror("read error"); exit(3); } std::cout << "超时了,距离上一次超时: " << tmp << " 次数" << std::endl; } return 0; }
其实上述代码我们就设置了一个每3秒钟的定时器,也就是每个3秒钟,都会出发一次,相当于每个3秒钟像文件中写入一次数据。运行结果如下图:
注意,后面我们会根据定时器实现一个时间轮来完成对超时任务的释放销毁。这里你可能还不理解超时任务的释放销毁,或许会详细讲解到。
3、3 正则库的简单使用
正则表达式(regular expression)描述了一种字符串匹配的模式(pattern),可以用来检查一个串是否含有某种子串、将匹配的子串替换或者从某个串中取出符合某个条件的子串等。
正则表达式的使用,可以使得HTTP请求的解析更加简单(这里指的时程序员的工作变得的简单,这并不代表处理效率会变高,实际上效率上是低于直接的字符串处理的),使我们实现的HTTP组件库使用起来更加灵活。
本篇文章就不再过多对正则表达式的详细使用进行详解,但是代码中会有注释,不懂的小伙伴可以取搜索相关文章进行学习。实例代码如下:
#include <regex> void req_line() { std::cout << "------------------first line start-----------------\n"; // std::string str = "GET /bitejiuyeke HTTP/1.1\r\n"; // std::string str = "GET /bitejiuyeke HTTP/1.1\n"; std::string str = "GET /bitejiuyeke?a=b&c=d HTTP/1.1\r\n"; // 匹配规则 std::regex re("(GET|HEAD|POST|PUT|DELETE) (([^?]+)(?:\\?(.*?))?) (HTTP/1\\.[01])(?:\r\n |\n)"); std::smatch matches; std::regex_match(str, matches, re); /*正则匹配获取完毕之后matches中的存储情况*/ /* matches[0] 整体⾸⾏ GET /bitejiuyeke?a=b&c=d HTTP/1.1 matches[1] 请求⽅法 GET matches[2] 整体URL /bitejiuyeke?a=b&c=d matches[3] ?之前 /bitejiuyeke matches[4] 查询字符串 a=b&c=d matches[5] 协议版本 HTTP/1.1 */ int i = 0; for (const auto &it : matches) { std::cout << i++ << ": "; std::cout << it << std::endl; } if (matches[4].length() > 0) { std::cout << "have param!\n"; } else { std::cout << "have not param!\n"; } std::cout << "------------------first line start-----------------\n"; return; } void method_match(const std::string str) { std::cout << "------------------method start-----------------\n"; std::regex re("(GET|HEAD|POST|PUT|DELETE) .*"); /* () 表⽰捕捉符合括号内格式的数据 * GET|HEAD|POST... |表⽰或,也就是匹配这⼏个字符串中的任意⼀个 * .* 中.表⽰匹配除换⾏外的任意单字符, *表⽰匹配前边的字符任意次; 合起来在这⾥就是 表⽰空格后匹配任意字符 * 最终合并起来表⽰匹配以GET或者POST或者PUT...⼏个字符串开始,然后后边有个空格的字 符串, 并在匹配成功后捕捉前边的请求⽅法字符串 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------method over------------------\n"; } void path_match(const std::string str) { // std::regex re("(([^?]+)(?:\\?(.*?))?)"); std::cout << "------------------path start------------------\n"; std::regex re("([^?]+).*"); /* * 最外层的() 表⽰捕捉提取括号内指定格式的内容 * ([^?]+) [^xyz] 负值匹配集合,指匹配⾮^之后的字符, ⽐如[^abc] 则plain就匹配到 plin字符 * +匹配前⾯的⼦表达式⼀次或多次 * 合并合并起来就是匹配⾮?字符⼀次或多次 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------path over------------------\n"; } void query_match(const std::string str) { std::cout << "------------------query start------------------\n"; std::regex re("(?:\\?(.*?))? .*"); /* * (?:\\?(.*?))? 最后的?表⽰匹配前边的表达式0次或1次,因为有的请求可能没有查询 字符串 * (?:\\?(.*?)) (?:pattern)表⽰匹配pattern但是不获取匹配结果 * \\?(.*?) \\?表⽰原始的?字符,这⾥表⽰以?字符作为起始 * .表⽰\n之外任意单字符, *表⽰匹配前边的字符0次或多次, ?跟在*或+之后表⽰懒惰模式, 也就是说以?开始的字符串就只匹配这⼀次就⾏, 后边还有以?开始的同格式字符串也不不会匹配 () 表⽰捕捉获取符合内部格式的数据 * 合并起来表⽰的就是,匹配以?开始的字符串,但是字符串整体不要, * 只捕捉获取?之后的字符串,且只匹配⼀次,就算后边还有以?开始的同格式字符串也不不会匹 配 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------query over------------------\n"; } void version_mathch(const std::string str) { std::cout << "------------------version start------------------\n"; std::regex re("(HTTP/1\\.[01])(?:\r\n|\n)"); /* * (HTTP/1\\.[01]) 外层的括号表⽰捕捉字符串 * HTTP/1 表⽰以HTTP/1开始的字符串 * \\. 表⽰匹配 . 原始字符 * [01] 表⽰匹配0字符或者1字符 * (?:\r\n|\n) 表⽰匹配⼀个\r\n或者\n字符,但是并不捕捉这个内容 * 合并起来就是匹配以HTTP/1.开始,后边跟了⼀个0或1的字符,且最终以\n或者\r\n作为结 尾的字符串 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------version over------------------\n"; }
3、4 通用类型any类型的实现
所谓通用类型,就是可以存储任意类型。我们第一时间可能想到通过模板来实现,代码如下:
template<class T> class Any { T _any; };
但上述并不是我们想要的。但是我们在定义Any对象时,必须指定参数。使用模板并不是我们想要的,我们想要的是如下:
/* template<class T> class Any { T _any; }; Any<int> a; a = 10; */ // 我们实际上想要的 Any a; a = 10; a = "Ggggggtm";
所以使用模板是肯定不行的。那我们就想到了类内再嵌套一个类,这样行不行呢?
class Any { private: template<class T> class placeholder { T _val; }; placeholder<T> _content; };
这样好像也不太行,因为我们在实例化Any类内中的placeholder对象时,也必须指定类型。那么有没有什么很好的办法,在实例化Any类中的成员变量对象时,不用指定其类型还能很好的存储任意类型呢?这里就可以使用多态的方法。思路是:
- 利用多态的特点,父类对象指向子类对象,也可以安全的访问子类对象中的成员;
- 子类使用模板,来存储任意类型;
- Any类中存储父类对象指针,来调用子类成员。
- 当我们存储任意类型时,new一个子类对象来保存数据, 然后用子类对象来初始化Any类中的所保存的父类(holder)对象指针即可。
大体的思路代码如下:
class Any { private: class holder { //...... }; template<class T> class placeholder : public holder { //..... T _val; }; holder* _content; };
整体的思路有了,下面我们直接给出实现代码,其中详细细节就不再过多解释。代码如下:
class Any { public: Any() :_content(nullptr) {} template<class T> Any(const T& val) :_content(new placeholder<T>(val)) {} Any(const Any& other) :_content(other._content ? other._content->clone() : nullptr) {} ~Any() { delete _content; } template<class T> T* get() { assert(typeid(T) == _content->type()); return &(((placeholder<T>*)_content)->_val); } Any& Swap(Any& other) { std::swap(_content, other._content); return *this; } template<class T> Any& operator=(const T& val) { Any(val).Swap(*this); return *this; } Any& operator=(const Any& other) { Any(other).Swap(*this); return *this; } private: class holder { public: virtual ~holder() {} virtual const std::type_info& type() = 0; virtual holder* clone() = 0; }; template<class T> class placeholder : public holder { public: placeholder(const T& val) :_val(val) {} virtual const std::type_info& type() { return typeid(T); } virtual holder* clone() { return new placeholder(_val); } public: T _val; }; holder *_content; };
四、服务器功能模块划分与实现
实现一个Reactor模式的服务器,首先肯定需要进行网络套接字编程。Reactor模式就是基于多路转接技术继续进行实现的,那么我们肯定需要对IO事件进行监控,然后对就绪的IO事件进行处理。怎么判断接收到的数据是否是一份完整的数据呢?所以在这里我们还要进行协议定制。当然,我们用的就是HTTP协议模式进行传输数据。那么不够一份完整的报文时,我们需要将接收到的数据暂时保存起来,那么肯定还需要定义一个接受和发送缓冲区。同时我们这个所实现的服务器当中,还添加了对不活跃链接的销毁,在后面我们也会详细讲到。
4、1 Buffer模块
Buffer模块是一个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能。Buffer模块主要就是用于当我们接收到一个不完整的报文时,需要将该报文暂时保存起来。同时,我们在对于客户端响应的数据,应该是在套接字可写的情况下进行发送,所以需要把数据放到暂时放到Buffer 的发送缓冲区当中。
对于缓冲区,我们只需要一段线性的空间来保存即可。那就可以直接用vector即可。我们实现的功能大概如下:
写入位置:
- 当前写入位置指向哪里,从哪里开始写入
- 如果后续剩余空间不够了!
- 考虑整体缓冲区空闲空间是否足够!(因为读位置也会向后偏移,前后有可能有空闲空间)
- 缓冲区空闲空间足够:将数据移动到起始位置
- 缓冲区空闲空间不够:扩容,从当前写位置开始扩容足够大小!
- 数据一旦写入成功,当前写位置,向后偏移!
读取数据:
- 当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读
- 可读数据大小:当前写入位置,减去当前读取位置!
整体实现相对来说较为简单,这里我们就直接给出代码,就不再做过多解释。
#include <ctime> #include <cstring> #include <iostream> #include <vector> #include <cassert> #include <string> using namespace std; #define BUFFER_SIZE 1024 class Buffer { private: std::vector<char> _buffer; // 使用vector进行内存空间管理 uint64_t _read_idx; // 读偏移 uint64_t _write_idx; // 写偏移 public: Buffer():_read_idx(0),_write_idx(0),_buffer(BUFFER_SIZE) {} char* begin() {return &*_buffer.begin();} // 获取当前写入起始地址 char *writePosition() { return begin() + _write_idx;} // 获取当前读取起始地址 char *readPosition() { return begin() + _read_idx; } // 获取缓冲区末尾空间大小 —— 写偏移之后的空闲空间,总体大小减去写偏移 uint64_t tailIdleSize() {return _buffer.size() - _write_idx; } // 获取缓冲区起始空间大小 —— 读偏移之前的空闲空间 uint64_t handIdleSize() {return _read_idx ;} // 获取可读空间大小 = 写偏移 - 读偏移 uint64_t readAbleSize() {return _write_idx - _read_idx ;} // 将读偏移向后移动 void moveReadOffset(uint64_t len) { // 向后移动大小必须小于可读数据大小 assert(len <= readAbleSize()); _read_idx += len; } // 将写偏移向后移动 void moveWriteOffset(uint64_t len) { assert(len <= tailIdleSize()); _write_idx += len; } void ensureWriteSpace(uint64_t len) { // 确保可写空间足够 (整体空间够了就移动数据,否则就扩容!) if (tailIdleSize() >= len) return; // 不够的话 ,判断加上起始位置够不够,够了将数据移动到起始位置 if (len <= tailIdleSize() + handIdleSize()) { uint64_t rsz = readAbleSize(); //帮当前数据大小先保存起来 std::copy(readPosition(),readPosition() + rsz,begin()); // 把可读数据拷贝到起始位置 _read_idx = 0; // 读归为0 _write_idx = rsz; // 可读数据大小是写的偏移量! } else { // 总体空间不够!需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可! _buffer.resize(_write_idx + len); } } // 写入数据 void Write(const void *data,uint64_t len) { ensureWriteSpace(len); const char *d = (const char*) data; std::copy(d,d + len,writePosition()); } void WriteAndPush(void* data,uint64_t len) { Write(data,len); moveWriteOffset(len); } void WriteStringAndPush(const std::string &data) { writeString(data); moveWriteOffset(data.size()); } void writeString(const std::string &data) { return Write(data.c_str(),data.size()); } void writeBuffer(Buffer &data) { return Write(data.readPosition(),data.readAbleSize()); } void writeBufferAndPush(Buffer &data) { writeBuffer(data); moveWriteOffset(data.readAbleSize()); } std::string readAsString (uint64_t len) { assert(len <= readAbleSize()); std::string str; str.resize(len); Read(&str[0],len); return str; } void Read(void *buf,uint64_t len) { // 读取数据 1. 保证足够的空间 2.拷贝数据进去 // 要求获取的大小必须小于可读数据大小! assert(len <= readAbleSize()); std::copy(readPosition(),readPosition() + len,(char*)buf); } void readAndPop(void *buf,uint64_t len) { Read(buf,len); moveReadOffset(len); } // 逐步调试!!!!! std::string ReadAsStringAndPop(uint64_t len) { assert(len <= readAbleSize()); std::string str = readAsString(len); moveReadOffset(len); return str; } char* FindCRLF() { char *res = (char*)memchr(readPosition(),'\n',readAbleSize()); return res; } // 通常获取一行数据,这种情况针对是: std::string getLine() { char* pos = FindCRLF(); if (pos == NULL) { return ""; } // +1 为了把换行数据取出来! return readAsString(pos - readPosition() + 1); } std::string getLineAndPop() { std::string str = getLine(); moveReadOffset(str.size()); return str; } void Clear() { // 清空缓冲区!clear // 只需要将偏移量归0即可! _read_idx = 0; _write_idx = 0; } };
4、2 Socket模块
我们在编写服务器时,少不了的肯定是需要Socket套接字编程的。Socket模块就是对网络套接字编程进行一个封装,方便我们后面直接进行相关操作。主要功能如下:
- 创建套接字(socket);
- 绑定地址信息(bind);
- 开始监听(listen);
- 向服务器发起连接(connect);
- 获取新连接(accept);
- 接受数据(recv);
- 发送数据(send);
- 关闭套接字(close);
- 创建一个监听链接;
- 创建一个客户端连接;
- 开启地址和端口重用;
- 设置套接字为非阻塞。
这里对简单的一些网络套接字接口就不再过多解释,对上述功能的后四点进行简单解释。我们先来看一下该模块的代码实现:
#define MAX_LISTEN 1024 class Socket { public: Socket() : _sockfd(-1) {} Socket(int fd) : _sockfd(fd) {} ~Socket() { Close(); } int Fd() { return _sockfd; } bool Create() { _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (_sockfd < 0) { ERR_LOG("create socket failed!"); return false; } return true; } bool Bind(const std::string ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); int ret = bind(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { ERR_LOG("bind sockfd failed!"); return false; } return true; } bool Listen(int backlog = MAX_LISTEN) { int ret = listen(_sockfd, backlog); if (ret < 0) { ERR_LOG("listen sockfd failed!"); return false; } return true; } // 向服务器发起连接 bool Connect(const std::string ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); int ret = connect(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { ERR_LOG("connect server failed!"); return false; } return true; } int Accept() { int newfd = accept(_sockfd, nullptr, nullptr); if (newfd < 0) { ERR_LOG("accept socker failed"); return -1; } return newfd; } ssize_t Recv(void *buf, size_t len, int flag = 0) { ssize_t ret = recv(_sockfd, buf, len, flag); if (ret <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0; ERR_LOG("recv msg failed!"); return -1; } return ret; } ssize_t NonBlockRecv(void *buf, size_t len) { return Recv(buf, len, MSG_DONTWAIT); } ssize_t Send(const void *buf, size_t len, int flag = 0) { ssize_t ret = send(_sockfd, buf, len, flag); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0; ERR_LOG("send msg failed!"); return -1; } return ret; } ssize_t NonBlockSend(const void *buf, size_t len) { return Send(buf, len, MSG_DONTWAIT); } void Close() { if (_sockfd) { close(_sockfd); _sockfd = -1; } } // 创建一个服务器端连接 bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) { if (Create() == false) return false; if (block_flag) NonBlock(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; ReuseAddress(); return true; } // 创建一个客户端连接 bool CreateClient(uint16_t port, const std::string &ip) { if (Create() == false) return false; if (Connect(ip, port) == false) return false; return true; } // 设置套接字选项 —— 开启地址端口重用 void ReuseAddress() { int val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(val)); val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(val)); } // 设置套接字为非阻塞 void NonBlock() { int flag = fcntl(_sockfd, F_GETFL, 0); fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK); } private: int _sockfd; };
我们知道在Tcp通信当中,建立连接会有三次握手,断开连接会有四次挥手。而主动断开链接的一方在进行第四次挥手的时候会变成TIME_WAIT状态,也就四需要等上一段时间该链接才算断开释放。这也就意味着主动断开连接的一方并不能很快的重新建立连接。为了解决这种情况,可以通过setsockopt函数进行设置套接字选项,开启地址和端口重用。具体封装后的代码如下:
void ReuseAddress() { int val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(val)); val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(val)); }
我们知道,调用read读取数据的时候,如果底层数据不就绪,默认情况下是阻塞的。在我们实现的服务器时,并不像让其阻塞。如果在读取数据时阻塞了,就会导致其他的任务得不到很好的执行。所以我们还需要一个对套接字设置非阻塞的功能。封装后的代码如下:
void NonBlock() { int flag = fcntl(_sockfd, F_GETFL, 0); fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK); }
需要注意的是,当read函数以非阻塞方式读取数据时,如果底层数据不就绪,那么read函数就会立即返回,但当底层数据不就绪时,read函数是以出错的形式返回的,此时的错误码会被设置为
EAGAIN
或EWOULDBLOCK
。因此在以非阻塞方式读取数据时,如果调用read函数时得到的返回值是-1,此时还需要通过错误码进一步进行判断,如果错误码的值是EAGAIN或EWOULDBLOCK,说明本次调用read函数出错是因为底层数据还没有就绪,因此后续还应该继续调用read函数进行轮询检测数据是否就绪,当数据继续时再进行数据的读取。
此外,调用read函数在读取到数据之前可能会被其他信号中断,此时read函数也会以出错的形式返回,此时的错误码会被设置为EINTR,此时应该重新执行read函数进行数据的读取。
因此在以非阻塞的方式读取数据时,如果调用read函数读取到的返回值为-1,此时并不应该直接认为read函数在底层读取数据时出错了,而应该继续判断错误码,如果错误码的值为
EAGAIN
、EWOULDBLOCK
或EINTR
则应该继续调用read函数再次进行读取或者说明底层没有数据。
创建一个监听连接是什么意思呢?当我们服务端创建套接字、绑定ip和端口后,需要将该套接字设置为监听状态,以上过程就是在创建一个监听的连接,也就是创建一个服务端连接。我们对上述的过程进行了封装,具体封装后的代码如下:‘
// 创建一个服务器端连接 bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) { if (Create() == false) return false; if (block_flag) NonBlock(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; ReuseAddress(); return true; }
创建客户端连接是什么意思?无非就是创建一个套接字,然后向服务器发起请求连接。这也是客户端所需要做的。我们对其进行简单封装后代码如下:
// 创建一个客户端连接 bool CreateClient(uint16_t port, const std::string &ip) { if (Create() == false) return false; if (Connect(ip, port) == false) return false; return true; }
4、3 Channel模块
高性能服务器必备的就是多路转接技术。当然,我们的项目也不例外。我们需要利用多路转接技术来帮我们进行等待(监控)事件就绪。且当有事件就绪时,会有一个Handler函数根据所触发的就绪事件统一帮我们处理就绪后的操作。
每个通信套接字都会有许多不同的事件,例如:读事件、写事件等等。为了方便我们后续对描述符(套接字)的监控事件在用户态更容易维护,以及触发事件后的操作流程更加的清晰,我们在这里对描述符(套接字)监控的事件和管理进行封装。那么Channel模块的主要功能就很清晰了。
1.对监控事件的管理:
- 判断描述符是否可读;
- 判断描述符是否可写;
- 对描述符监控添加可读;
- 对描述符监控添加可写;
- 解除可读事件监控;
- 解除可写事件监控;
- 解除所有事件监控。
2.对监控事件触发后的处理:
- 设置对于不同事件的回调处理函数;
- 明确触发了某个事件该如何处理。
我们先看一下Channel模块的代码:
class Channel { private: int _fd; uint32_t events; // 当前需要监控的事件 uint32_t revents; // 当前连接触发的事件 using eventCallback = std::function < void() > ; eventCallback _read_callback; // 可读事件被触发的回调函数 eventCallback _error_callback; // 可写事件被触发的回调函数 eventCallback _close_callback; // 连接关闭事件被触发的回调函数 eventCallback _event_callback; // 任意事件被触发的回调函数 eventCallback _write_callback; // 可写事件被触发的回调函数 public: Channel(int fd) : fd(_fd) {} int Fd() { return _fd; } void SetRevents(uint32_t events) { _revents = events; } void setReadCallback(const eventCallback &cb) { _read_callback = cb; } void setWriteCallback(const eventCallback &cb) { _write_callback = cb; } void setErrorCallback(const eventCallback &cb) { _error_callback = cb; } void setCloseCallback(const eventCallback &cb) { _close_callback = cb; } void setEventCallback(const eventCallback &cb) { _event_callback = cb; } bool readAble() { // 当前是否可读 return (_events & EPOLLIN); } bool writeAble() { // 当前是否可写 return (_events & EPOLLOUT); } void enableRead() { // 启动读事件监控 _events |= EPOLLIN; // 后面会添加到EventLoop的事件监控! } void enableWrite() { // 启动写事件监控 _events |= EPOLLOUT; // 后面会添加到EventLoop的事件监控! } void disableRead() { // 关闭读事件监控 _events &= ~EPOLLIN; // 后面会修改到EventLoop的事件监控! } void disableWrite() { // 关闭写事件监控 _events &= ~EPOLLOUT; } void disableAll() { // 关闭所有事件监控 _events = 0; } void Remove(); // 后面会调用EventLoop接口移除监控 void HandleEvent() { if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_read_callback) _read_callback(); } /*有可能会释放连接的操作事件,一次只处理一个*/ if (_revents & EPOLLOUT) { if (_write_callback) _write_callback(); } else if (_revents & EPOLLERR) { if (_error_callback) _error_callback(); // 一旦出错,就会释放连接,因此要放到前边调用任意回调 } else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } /*不管任何事件,都调用的回调函数*/ if (_event_callback) _event_callback(); } };
注意,我们在这里使用多路转接技术时,采用的时epoll。因为epoll的编码简单,且效率最高。所以在私有成员变量时,我们给出了监控事件和就绪事件。
同时,我们这里使用了通用的函数封装器function。原因就是我们并不知道所触发事件的回调函数所需要的参数。当在设置回调函数时,使用函数包装器bind进行绑定参数即可。
在Handler也就是上述的HandlerEvent函数中,我们对所触发的事件需要回调进行了分类。读事件触发后,并不会直接释放连接(后续会讲解原因)。其他事件触发后,都有可能导致连接被释放,所以一次处理一个事件,以防连接被释放的情况下再去处理事件就会导致陈鼓型崩溃。
4、4 Poller模块
上述的Channel模块是对描述符的监控事件进行管理的封装。现在我们还需要对描述符进行IO事件监控啊!说明这两个模块是密切关联的。
上述我们也提到了,所用的多路转接模型是epoll。那么该模块就是对epoll的操作进行封装的。封装思想:
- 必须拥有一个epoll的操作句柄;
- 拥有一个struct epoll_event 结构数组,监控保存所有的活跃事件;
- 使用一个哈希表管理描述符与描述符对应的事件管理Channnel对象。
整体逻辑流程:
- 对描述符进行监控,通过Channnel才能知道描述符监控什么事件(注意,我们在使用epoll对事件监控前,一定是在Channel模块中对所需要监控的事件events进行了设置,然后再使用epoll进行监控);
- 当描述符就绪了,通过描述符在哈希表中找到对应的Channel(当然,我们都会添加Channel到哈希表种的。得到了Channel才知道什么事件如何处理)当描述符就绪了,返回就绪描述符对应的Channel。
通过对上述的了解,我们就已经知道该模块所需要实现的功能了。具体如下:
- 添加事件监控 (channel模块);
- 修改事件监控;
- 移除事件监控;
- 开始事件监控。
具体该模块实现代码如下:
#define MAX_EPOLLEVENTS 1024 // Poller模块是对epoll进⾏封装的⼀个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。 class Poller { private: int _epfd; struct epoll_event _evs[MAX_EPOLLEVENTS]; std::unordered_map<int, Channel *> _channels; private: // 对epoll直接操作 void Update(Channel *channel, int op) { int fd = channel->Fd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->Events(); int ret = epoll_ctl(_epfd, op, fd, &ev); if (ret < 0) { ERR_LOG("EPOLLCTL FAILED!!!"); abort(); // 推出程序!! } } // 判断一个Channel是否已经添加到了事件监控 bool hashChannel(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it == _channels.end()) { return false; } return true; } public: Poller() { _epfd = epoll_create(MAX_EPOLLEVENTS); if (_epfd < 0) { ERR_LOG("EPOLL CREATE FAILED!!"); abort(); // 退出程序 } } // 添加或者修改监控事件 void UpdateEvent(Channel *channel) { // 有描述符也有事件 bool ret = hashChannel(channel); if (ret == false) { _channels.insert(std::make_pair(channel->Fd(), channel)); return Update(channel, EPOLL_CTL_ADD); // 不存在添加 } return Update(channel, EPOLL_CTL_MOD); // 存在了更新 } // 移除监控事件 void removeEvent(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it != _channels.end()) { _channels.erase(it); } Update(channel, EPOLL_CTL_DEL); } // 开始监控,返回活跃链接! void Poll(std::vector<Channel *> *active) { // int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout) int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); if (nfds < 0) { if (errno == EINTR) { return; } ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno)); abort(); // 退出程序 } for (int i = 0; i < nfds; i++) { auto it = _channels.find(_evs[i].data.fd); assert(it != _channels.end()); it->second->setRevents(_evs[i].events); // 设置实际就绪的事件 active->push_back(it->second); } return; } };
这个模块主要都是封装的对epoll的操作。其中需要注意的是,我们在对事件开始监控时,需要将不同描述符就绪的事件进行返回,以便我们后续进行操作。所以这里就传入了一个指针,作为输出型参数。
4、5 Eventloop模块
这个模块其实就是我们所说的 one thread one loop中的loop,也是我们所说的reactor。这个模块必然是一个模块对应一个线程。这个模块是干什么的呢?其实就是进行事件监控管理和事件处理的模块。你也可以理解为对Channel模块和Poller模块的整合。接下来我们详细解释一下该模块的思路讲解。
EventLoop模块是进行时间监控,以及事件处理的模块。同时这个模块还是与线程一一对应的。监控了一个链接,而这个连接一旦就绪,就要进行事件处理。假如一个线程正在执行就绪事件,那么该连接再有其他事件就绪呢?会不会就被分配到其他线程了呢?但是如果这个描述符在多个线程中都出发了事件进行处理,就会存在线程安全的问题。因此我们需要将一个连接的事件监控,以及连接事件的处理,以及其他操作都放在同一个线程当中进行。
但是问题又来了:如何保证一个连接所有的操作都在eventloop对应的线程中执行呢?我们可以在eventloop模块中添加一个任务队列,对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列当中去。
总结eventloop处理流程:
- 在线程中对描述符进行事件监控;
- 有描述符就绪,则对描述符进行事件处理(必须保证处理回调函数中的操作都在线程当中);
- 所有的就绪事件处理完了,这时候再去将任务队列中的任务一一执行。
事件监控就交给Poller模块来处理,有事件就绪了则进行处理事件。但是有一个需要注意的点:因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此得有一个事件通知的东西,能够唤醒事件监控的阻塞。
我们再来看一下eventfd函数。如下图:
eventfd:一种事件通知机制,该函数就是创建一个描述符用于实现事件通知,eventfd本质在内核里边管理的就是一个计数器。创建eventfd就会在内核中创建一个计数器(结构),每当向evenfd中写入一个数值--用于表示事件通知次数,可以使用read进行数据的读取,读取到的数据就是通知的次数。假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,再去read读取出来的数字就是3,读取之后计数清0。用处:在EventLoop模块中实现线程间的事件通知功能。eventfd也是通过read/write/close进行操作的。
接下来我们看一下该模块的代码实现:
class EventLoop { private: using Functor = std::function<void()>; std::thread::id _thread_id; // 线程ID int _event_fd; // eventfd 唤醒IO事件监控有可能的阻塞!!! std::unique_ptr<Channel> _event_channel; Poller _poller; // 进行所有描述符的事件监控 std::vector<Functor> _tasks; // 任务池 std::mutex _mutex; // 实现任务池操作的线程安全!!! public: // 执行任务池中的所有任务!! void runAllTask() { std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); // 出了作用域,锁就会被解开!! _tasks.swap(functor); } for (auto &f : functor) { f(); } return; } static int createEventFd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { ERR_LOG("CREATE ENVENTED FAILED !!!"); abort(); } return efd; } void readEventfd() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof(res)); if (ret < 0) { if (errno == EINTR || errno == EAGAIN) { return; } ERR_LOG("READ EVENTFD FAILED!"); abort(); } return; } void weakEventFd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if (ret < 0) { if (errno == EINTR) { return; } ERR_LOG("READ EVENTFD FAILED!"); abort(); } return; } public: EventLoop() : _thread_id(std::this_thread::get_id()), _event_fd(createEventFd()), _event_channel(new Channel(this, _event_fd)), { // 给eventfd添加可读事件回调函数,读取eventfd事件通知次数 _event_channel->setReadCallback(std::bind(&EventLoop::readEventfd, this)); // 启动eventfd的读事件监控 _event_channel->enableRead(); } void runInLoop(const Functor &cb) { // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。 if (isInLoop()) { return cb(); } return QueueInLoop(cb); } void queueInLoop(const Functor &cb) { // 将操作压入任务池! std::unique_lock<std::mutex> _lock(_mutex); // 唤醒有可能因为没有事件就绪,而导致的epoll阻塞; // 其实就是给eventfd写入一个数据,eventfd就会触发可读事件 _tasks.push_back(cb); weakEventFd(); } bool isInLoop() { // 永远判断当前线程是否是EventLoop所对应的线程 return (_thread_id == std::this_thread::get_id()); } void updateEvent(Channel *channel) { // 添加/修改描述符的事件监控 return _poller.UpdateEvent(channel); } void removeEvent(Channel *channel) { // 移除描述符的监控 return _poller.removeEvent(channel); } void Start() { // 任务监控完毕进行处理任务! // 三步走:事件监控-》就绪事件处理-》执行任务 std::vector<Channel *> actives; _poller.Poll(&actives); for (auto &channel : actives)