【在Linux世界中追寻伟大的One Piece】多路转接epoll(续)

目录

1 -> epoll的工作方式

1.1 -> 水平触发(Level Triggered)工作模式

1.2 -> 边缘触发(Edge Triggered)工作模式

2 -> 对比LT与ET

3 -> 理解ET模式和非阻塞文件描述符

4 -> epoll的使用场景

5 -> epoll示例

5.1 -> epoll服务器(LT模式)

5.2 -> epoll服务器(ET模式)


1 -> epoll的工作方式

epoll有2中工作方式:

  • 水平触发(LT)
  • 边缘触发(ET)

加入有这样一个例子:

  • 已经把一个tcp_socket添加到epoll描述符。
  • 这个时候socket的另一端被写入了2KB的数据。
  • 调用epoll_wait,并且它会返回。说明它已经准备好读取操作。
  • 然后调用read,只读取了1KB的数据。
  • 继续调用epoll_wait……

1.1 -> 水平触发(Level Triggered)工作模式

epoll默认状态下就是LT工作模式。 

  • 当epoll检测到socket上事件就绪的时候,可以不立刻进行处理。或者只处理一部分。
  • 如上面的例子,由于只读了1K的数据,缓冲区中还剩1K的数据,在第二次调用epoll_wait时,epoll_wait仍然会立刻返回并通知socket读事件就绪。
  • 直到缓冲区上所有的数据都被处理完,epoll_wait才不会立刻返回。
  • 支持阻塞读写和非阻塞读写

1.2 -> 边缘触发(Edge Triggered)工作模式

如果我们在第一步将socket添加到epoll描述符的时候使用了EPOLLET标志,epoll进入ET工作模式。

  • 当epoll检测到socket上事件就绪时,必须立刻处理。
  • 如上面的例子,虽然只读了1K的数据,缓冲区还剩1K的数据,在第二次调用epoll_wait的时候,epoll_wait不会再返回了。
  • 也就是说,ET模式下,文件描述符上的事件就绪后,只有一次处理机会。
  • ET的性能比LT的性能更高(epoll_wait返回的次数少了很多)。Nginx默认采用ET模式使用epoll。
  • 只支持非阻塞的读写

select和poll其实也是工作在LT模式下。epoll既可以支持LT,也可以支持ET。

2 -> 对比LT与ET

LT是epoll的默认行为

使用ET能够减少epoll触发的次数。但是代价就是强逼着程序员一次响应就绪过程中就把所有的数据都处理完。

相当于一个文件描述符就绪后,不会反复被提示就绪,看起来就比LT更高效一些。但是在LT情况下如果也能做到每次就绪的文件描述符都立刻处理,不让这个就绪被反复提示的话,其实性能也是一样的。

另一方面,ET的代码复杂程度更高了。

3 -> 理解ET模式和非阻塞文件描述符

使用ET模式的epoll,需要将文件描述设置为非阻塞。这个不是接口上的要求,而是“工程实践”上的要求。

假设一个场景:服务器接收到一个10k的请求,会向客户端返回一个应答数据。如果客户端收不到应答,不会发送第二个10K的请求。

如果服务端写的代码是阻塞式的read,并且一次只read1K数据的话(read不能保证一次就把所有的数据都读出来,参考man手册的说明,可能被信号打断),剩下的9K数据就会待在缓冲区中。

此时由于epoll是ET模式,并不会认为文件描述符读就绪。epoll_wait就不会再次返回。剩下的9K数据会一直在缓冲区中。直到下一次客户端再给服务器写数据。

但是问题来了。

  • 服务器只读到1K个数据,要10K读完才会给客户端返回响应数据。
  • 客户端要读到服务器的响应,才会发送下一个请求。
  • 客户端发送了下一个请求,epoll_wait才会返回,才能去读缓冲区中剩余的数据。

所以,为了解决上述问题(阻塞read不一定能一下把完整的请求读完),于是就可以使用非阻塞轮训的方式来读缓冲区,保证一定能把完整的请求都读出来。

而如果是LT没这个问题。只要缓冲区中的数据没读完,就能够让epoll_wait返回文件描述符读就绪。

4 -> epoll的使用场景

epoll的高性能,是有一定的特定场景的。如果场景选择的不适宜,epoll的性能可能适得其反。

  • 对于多连接,且多连接中只有一部分连接比较活跃时,比较适合使用epoll。

例如,典型的一个需要处理上万个客户端的服务器,例如各种互联网APP的入口服务器,这样的服务器就很适合epoll。

如果只是系统内部,服务器和服务器之间进行通信,只有少数的几个连接,这种情况下用epoll就并不适合。具体要根据需求和场景特点来决定使用哪种IO模型。

5 -> epoll示例

5.1 -> epoll服务器(LT模式)

tcp_epoll_server.hpp

///
// 封装一个 Epoll 服务器, 只考虑读就绪的情况
///

#pragma once
#include <vector>
#include <functional>
#include <iostream>
#include <sys/epoll.h>
#include "tcp_socket.hpp"

typedef std::function<void(const std::string&, std::string*resp)> Handler;

class Epoll 
{
public:
	Epoll() 
	{
		epoll_fd_ = epoll_create(10);
	}

	~Epoll() 
	{
		close(epoll_fd_);
	}

	bool Add(const TcpSocket& sock) const 
	{
		int fd = sock.GetFd();
		printf("[Epoll Add] fd = %d\n", fd);

		epoll_event ev;
		ev.data.fd = fd;
		ev.events = EPOLLIN;

		int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
		if (ret < 0) 
		{
			perror("epoll_ctl ADD");
			return false;
		}

		return true;
	}

	bool Del(const TcpSocket& sock) const 
	{
		int fd = sock.GetFd();

		printf("[Epoll Del] fd = %d\n", fd);

		int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
		if (ret < 0) 
		{
			perror("epoll_ctl DEL");
			return false;
		}

		return true;
	}

	bool Wait(std::vector<TcpSocket>* output) const 
	{
		output->clear();
		epoll_event events[1000];

		int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
		if (nfds < 0) 
		{
			perror("epoll_wait");
			return false;
		}

		// [注意!] 此处必须是循环到 nfds, 不能多循环
		for (int i = 0; i < nfds; ++i) 
		{
			TcpSocket sock(events[i].data.fd);
			output->push_back(sock);
		}

		return true;
	}

private:
	int epoll_fd_;
};

class TcpEpollServer 
{
public:
	TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip),
		port_(port) 
	{
	}

	bool Start(Handler handler) 
	{
		// 1. 创建 socket
		TcpSocket listen_sock;
		CHECK_RET(listen_sock.Socket());

		// 2. 绑定
		CHECK_RET(listen_sock.Bind(ip_, port_));

		// 3. 监听
		CHECK_RET(listen_sock.Listen(5));

		// 4. 创建 Epoll 对象, 并将 listen_sock 加入进去
		Epoll epoll;
		epoll.Add(listen_sock);

		// 5. 进入事件循环
		for (;;) 
		{
			// 6. 进行 epoll_wait
			std::vector<TcpSocket> output;
			if (!epoll.Wait(&output)) 
			{
				continue;
			}

			// 7. 根据就绪的文件描述符的种类决定如何处理
			for (size_t i = 0; i < output.size(); ++i) 
			{
				if (output[i].GetFd() == listen_sock.GetFd()) 
				{
					// 如果是 listen_sock, 就调用 accept
					TcpSocket new_sock;
					listen_sock.Accept(&new_sock);
					epoll.Add(new_sock);
				}
				else 
				{
					// 如果是 new_sock, 就进行一次读写
					std::string req, resp;
					bool ret = output[i].Recv(&req);
					if (!ret) 
					{
						// [注意!!] 需要把不用的 socket 关闭
						// 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了
						epoll.Del(output[i]);
						output[i].Close();
						continue;
					}

					handler(req, &resp);
					output[i].Send(resp);
				} // end for
			} // end for (;;)
		}

		return true;
	}

private:
	std::string ip_;
	uint16_t port_;
};

dict_server.cc只需要将server对象的类型改成TcpEpollServer即可。

5.2 -> epoll服务器(ET模式)

基于LT版本稍加修改即可。

  1. 修改tcp_socket.hpp,新增非阻塞读和非阻塞写接口。
  2. 对于accept返回的new_sock加上EPOLLET这样的选项。

注意:

此代码暂时未考虑listen_sock ET的情况。如果将listen_sock设为ET,则需要非阻塞轮询的方式accept。否则会导致同一时刻大量的客户端同时连接的时候,只能accept一次的问题。

tcp_socket.hpp

// 以下代码添加在 TcpSocket 类中
// 非阻塞 IO 接口

bool SetNoBlock() 
{
	int fl = fcntl(fd_, F_GETFL);
	if (fl < 0) 
	{
		perror("fcntl F_GETFL");
		return false;
	}

	int ret = fcntl(fd_, F_SETFL, fl | O_NONBLOCK);
	if (ret < 0) 
	{
		perror("fcntl F_SETFL");
		return false;
	}

	return true;
}

bool RecvNoBlock(std::string* buf) const 
{
	// 对于非阻塞 IO 读数据, 如果 TCP 接受缓冲区为空, 就会返回错误
	// 错误码为 EAGAIN 或者 EWOULDBLOCK, 这种情况也是意料之中, 需要重试
	// 如果当前读到的数据长度小于尝试读的缓冲区的长度, 就退出循环
	// 这种写法其实不算特别严谨(没有考虑粘包问题)
	buf->clear();
	char tmp[1024 * 10] = { 0 };
	for (;;) 
	{
		ssize_t read_size = recv(fd_, tmp, sizeof(tmp) - 1, 0);
		if (read_size < 0) 
		{
			if (errno == EWOULDBLOCK || errno == EAGAIN) 
			{
				continue;
			}
			perror("recv");
			return false;
		}

		if (read_size == 0) 
		{
			// 对端关闭, 返回 false
			return false;
		}

		tmp[read_size] = '\0';
		*buf += tmp;
		if (read_size < (ssize_t)sizeof(tmp) - 1) 
		{
			break;
		}
	}

	return true;
}

bool SendNoBlock(const std::string& buf) const 
{
	// 对于非阻塞 IO 的写入, 如果 TCP 的发送缓冲区已经满了, 就会出现出错的情况
	// 此时的错误号是 EAGAIN 或者 EWOULDBLOCK. 这种情况下不应放弃治疗
	// 而要进行重试
	ssize_t cur_pos = 0; // 记录当前写到的位置
	ssize_t left_size = buf.size();
	for (;;) 
	{
		ssize_t write_size = send(fd_, buf.data() + cur_pos,
			left_size, 0);
		if (write_size < 0) 
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK) 
			{
				// 重试写入
				continue;
			}

			return false;
		}

		cur_pos += write_size;
		left_size -= write_size;
		// 这个条件说明写完需要的数据了
		if (left_size <= 0) 
		{
			break;
		}
	}

	return true;
}

tcp_epoll_server.hpp

///
// 封装一个 Epoll ET 服务器
// 修改点:
// 1. 对于 new sock, 加上 EPOLLET 标记
// 2. 修改 TcpSocket 支持非阻塞读写
// [注意!] listen_sock 如果设置成 ET, 就需要非阻塞调用 accept 了
///

#pragma once

#include <vector>
#include <functional>
#include <sys/epoll.h>
#include "tcp_socket.hpp"

typedef std::function<void(const std::string&, std::string* resp)> Handler;

class Epoll 
{
public:
	Epoll() 
	{
		epoll_fd_ = epoll_create(10);
	}

	~Epoll() 
	{
		close(epoll_fd_);
	}

	bool Add(const TcpSocket& sock, bool epoll_et = false) const 
	{
		int fd = sock.GetFd();
		printf("[Epoll Add] fd = %d\n", fd);

		epoll_event ev;
		ev.data.fd = fd;
		if (epoll_et) 
		{
			ev.events = EPOLLIN | EPOLLET;
		}
		else 
		{
			ev.events = EPOLLIN;
		}

		int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
		if (ret < 0) 
		{
			perror("epoll_ctl ADD");
			return false;
		}

		return true;
	}

	bool Del(const TcpSocket& sock) const 
	{
		int fd = sock.GetFd();
		printf("[Epoll Del] fd = %d\n", fd);

		int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
		if (ret < 0) 
		{
			perror("epoll_ctl DEL");
			return false;
		}

		return true;
	}

	bool Wait(std::vector<TcpSocket>* output) const 
	{
		output->clear();
		epoll_event events[1000];

		int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
		if (nfds < 0) 
		{
			perror("epoll_wait");
			return false;
		}

		// [注意!] 此处必须是循环到 nfds, 不能多循环
		for (int i = 0; i < nfds; ++i) 
		{
			TcpSocket sock(events[i].data.fd);
			output->push_back(sock);
		}

		return true;
	}

private:
	int epoll_fd_;
};

class TcpEpollServer 
{
public:
	TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip),
		port_(port) 
	{
	}

	bool Start(Handler handler) 
	{
		// 1. 创建 socket
		TcpSocket listen_sock;
		CHECK_RET(listen_sock.Socket());

		// 2. 绑定
		CHECK_RET(listen_sock.Bind(ip_, port_));

		// 3. 监听
		CHECK_RET(listen_sock.Listen(5));

		// 4. 创建 Epoll 对象, 并将 listen_sock 加入进去
		Epoll epoll;
		epoll.Add(listen_sock);

		// 5. 进入事件循环
		for (;;) 
		{
			// 6. 进行 epoll_wait
			std::vector<TcpSocket> output;
			if (!epoll.Wait(&output)) 
			{
				continue;
			}

			// 7. 根据就绪的文件描述符的种类决定如何处理
			for (size_t i = 0; i < output.size(); ++i) 
			{
				if (output[i].GetFd() == listen_sock.GetFd()) 
				{
					// 如果是 listen_sock, 就调用 accept
					TcpSocket new_sock;
					listen_sock.Accept(&new_sock);
					epoll.Add(new_sock, true);
				}
				else 
				{
					// 如果是 new_sock, 就进行一次读写
					std::string req, resp;
					bool ret = output[i].RecvNoBlock(&req);
					if (!ret) 
					{
						// [注意!!] 需要把不用的 socket 关闭
						// 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了
						epoll.Del(output[i]);
						output[i].Close();
						continue;
					}

					handler(req, &resp);
					output[i].SendNoBlock(resp);
					printf("[client %d] req: %s, resp: %s\n",
						output[i].GetFd(),
						req.c_str(), resp.c_str());
				} // end for
			} // end for (;;)
		}

		return true;
	}

private:
	std::string ip_;
	uint16_t port_;
};

感谢各位大佬支持!!!

互三啦!!!

上一篇:29-Elasticsearch 集群监控