windows的IOCP(Input Output Completion Port,输入输出完成端口)
2019年6月2日
12:27
如果仅仅只是代码层面上的话,windows上的IOCP逻辑好像也不是太难。但是实际上总是有一种雾里看花的感觉。因为能力所限,不能去深入了解一下到底是怎么回事
? ?
为了突破select等传统I/O模型的极限,每种操作系统(内核级别)都会提供特有的I/O模型以提高性能。其中最具代表性的有Linux的epoll、BSD的kqueue及Windows的IOCP。他们都在操作系统界别提供支持并完成功能。
? ?
问:epoll快还是IOCP快?
服务器端的响应事件和并发服务器数是衡量服务器端好坏的重要因素。但对于普通人来讲,这两种模型已经非常优秀。基于epoll实现的服务器端在并发数量上有缺陷,但换成IOCP后没有任何问题。IOCP的响应时间太慢是个问题,换成epoll后就解决了。
在硬件性能和分配带宽充足的情况下,如果响应时间和并发数量出现了问题,首先应怀疑以下两点,修正后通常会解决大部分问题。
- 低效的IO结构或者低效的CPU使用
- 数据库设计和查询语句(Query)的结构
虽然IOCP拥有在其他IO模型不具备的优点,但这并非左右服务器端性能的绝对因素,并且不可能任何情况下都体现这种优点。他们的主要差异在于操作系统的内部工作机制。
? ?
通过重叠IO理解IOCP
创建非阻塞模式的套接字
? ?
SOCKET hLisnSock;
int mode = 1;
…
hListSock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
ioctlsocket(hLisnSock, FIONBIO, &mode); // for non-blocking socket
…
? ?
ioctlsocket函数完成以下功能:
将hLisnSock句柄引用的套接字IO模式(FIONBIO)改为变量mode中指定的形式。
FIONBIO为设置选项,mode中的值即为设置启用与否。函数调用完成后,mode值为非0,则以更改为非阻塞模式,否则为阻塞模式
? ?
非阻塞模式套接字除了以非阻塞模式进行IO以外,还具有如下功能:
- 如果在没有客户端连接请求的状态下调用accept函数,将直接返回INVALID_SOCKET。调用WSAGetLastError函数会返回WSAEWOULDBLOCK。
- 调用accept函数时创建的套接字同样具有非阻塞属性
所以针对非阻塞套接字调用accept函数并返回INVALID_SOCKET时,应该通过WSAGetLastError函数确认返回INVALID_SOCKET的理由,然后进行处理
? ?
使用重叠IO方法实现类似IOCP的服务端代码
? ?
#include <stdio.h>
#include <stdlib.h>
#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib")
#pragma warning(disable: 4996)
? ?
#define BUF_SIZE 1024
void CALLBACK ReadCompRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);
void CALLBACK WriteCompRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);
void ErrorHandling(const char * message);
? ?
typedef struct
{
SOCKET hClntSock;
char buf[BUF_SIZE];
WSABUF wsaBuf;
} PER_IO_DATA, * LPPER_IO_DATA;
? ?
int main(int argc, char* argv[])
{
WSADATA wsaData;
SOCKET hListenSock, hRecvSock;
SOCKADDR_IN listenAddr, recvAddr;
LPWSAOVERLAPPED lpOvlp;
DWORD recvBytes;
LPPER_IO_DATA hbInfo;
int mode = 1, recvAddrSz, flagInfo = 0;
? ?
if (argc != 2)
{
printf("Usage: %s <port> \n", argv[0]);
exit(1);
}
? ?
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
ErrorHandling("WSAStartup() error");
? ?
hListenSock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
ioctlsocket(hListenSock, FIONBIO, &mode);
? ?
memset(&listenAddr, 0, sizeof(listenAddr));
listenAddr.sin_family = AF_INET;
listenAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
listenAddr.sin_port = htons(atoi(argv[1]));
? ?
if (bind(hListenSock, (SOCKADDR*)&listenAddr, sizeof(listenAddr)) != 0)
ErrorHandling("bind() error");
if (listen(hListenSock, 5) != 0)
ErrorHandling("listen() error");
? ?
recvAddrSz = sizeof(recvAddr);
while (1)
{
SleepEx(100, TRUE); // for alertable wait state
hRecvSock = accept(hListenSock, (SOCKADDR*)&recvAddr, &recvAddrSz);
if (hRecvSock == INVALID_SOCKET)
{
if (WSAGetLastError() == WSAEWOULDBLOCK)
continue;
else
ErrorHandling("accept() error");
}
puts("Client connected...");
? ?
lpOvlp = (LPWSAOVERLAPPED)malloc(sizeof(WSAOVERLAPPED));
memset(lpOvlp, 0, sizeof(WSAOVERLAPPED));
? ?
hbInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));
hbInfo->hClntSock = hRecvSock;
(hbInfo->wsaBuf).buf = hbInfo->buf;
(hbInfo->wsaBuf).len = BUF_SIZE;
? ?
lpOvlp->hEvent = (HANDLE)hbInfo;
WSARecv(hRecvSock, &(hbInfo->wsaBuf), 1, &recvBytes, &flagInfo, lpOvlp, ReadCompRoutine);
? ?
}
closesocket(hRecvSock);
closesocket(hListenSock);
WSACleanup();
? ?
return 0;
}
? ?
void CALLBACK ReadCompRoutine(DWORD dwError, DWORD szRecvBytes, LPWSAOVERLAPPED lpOverlapped, DWORD flags)
{
LPPER_IO_DATA hbInfo = (LPPER_IO_DATA)(lpOverlapped->hEvent);
SOCKET hSock = hbInfo->hClntSock;
LPWSABUF bufInfo = &(hbInfo->wsaBuf);
DWORD sentBytes;
? ?
if (szRecvBytes == 0)
{
closesocket(hSock);
free(lpOverlapped->hEvent);
free(lpOverlapped);
}
else //echo
{
bufInfo->len = szRecvBytes;
WSASend(hSock, bufInfo, 1, &sentBytes, 0, lpOverlapped, WriteCompRoutine);
}
}
void CALLBACK WriteCompRoutine(DWORD dwError, DWORD szSendBytes, LPWSAOVERLAPPED lpOverlapped, DWORD flags)
{
LPPER_IO_DATA hbInfo = (LPPER_IO_DATA)(lpOverlapped->hEvent);
SOCKET hSock = hbInfo->hClntSock;
LPWSABUF bufInfo = &(hbInfo->wsaBuf);
DWORD recvBytes;
? ?
int flagInfo = 0;
bufInfo->len = BUF_SIZE;
WSARecv(hSock, bufInfo, 1, &recvBytes, &flagInfo, lpOverlapped, ReadCompRoutine);
}
void ErrorHandling(const char* message)
{
fprintf(stderr, "%s, Error Code %d \n", message, WSAGetLastError());
exit(1);
}
? ?
可以很容易明白就是对于每一个连接起来的客户端创建了一个ReadCompRoutine和WriteCompRoutine互相调用的环,而这个圆环的接缝就是Sleepex函数。
接着也就很容易明白为什么需要非阻塞的套接字。
同时可以看到每次连接到了一个客户端都会创建一个PER_IO_DATA结构体,以便将新创建的套接字句柄和缓冲信息传递给ReadCompRoutine函数和WriteCompRoutine函数。该结构体的地址由WSAOVERLAPPED结构体成员的hEvent保存。因为在使用Completion Routine的重叠IO中,不需要使用WSAOVERLAPPED结构体的hEvent成员(其他的成员还需要使用?)。
? ?
客户端代码
? ?
#include <stdio.h>
#include <stdlib.h>
#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib")
#pragma warning(disable: 4996)
? ?
#define BUF_SIZE 1024
void ErrorHandling(const char* message);
? ?
int main(int argc, char* argv[])
{
WSADATA wsaData;
SOCKET hSocket;
SOCKADDR_IN servAddr;
char message[BUF_SIZE];
int strLen, readLen;
? ?
if (argc != 3)
{
printf("Usage: %s <IP> <port> \n", argv[0]);
exit(1);
}
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
ErrorHandling("socket() error");
? ?
hSocket = socket(PF_INET, SOCK_STREAM, 0);
? ?
memset(&servAddr, 0, sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_addr.S_un.S_addr = inet_addr(argv[1]);
servAddr.sin_port = htons(atoi(argv[2]));
? ?
if (connect(hSocket, (SOCKADDR*)& servAddr, sizeof(servAddr)) != 0)
ErrorHandling("connect() error");
else
puts("Connected...");
? ?
while (1)
{
fputs("Input message(Q to quit): ", stdout);
fgets(message, BUF_SIZE, stdin);
if (!strcmp(message, "q\n") || !strcmp(message, "Q\n"))
break;
? ?
strLen = strlen(message);
send(hSocket, message, strLen, 0);
readLen = 0;
while (1)
{
readLen += recv(hSocket, &message[readLen], BUF_SIZE - 1, 0);
if (readLen >= strLen) // 这样子读取的话存在有一个问题,socket缓冲区里的其他怎么办
break;
}
message[strLen] = 0;
printf("Message from server: %s", message);
}
closesocket(hSocket);
WSACleanup();
? ?
return 0;
}
? ?
void ErrorHandling(const char* message)
{
fprintf(stderr, "%s, Error Code %d \n", message, WSAGetLastError());
exit(1);
}
? ?
从重叠IO模型到IOCP模型
重叠IO模型回声服务器端存在的缺点:
重复调用非阻塞模式的accept函数和以进入alertable wait状态为目的的SleepEx函数将影响性能。
这属于重叠IO结构固有的缺陷。
? ?
但可以让main线程(main函数内部)调用accept函数,在单独创建线程负责客户端IO。这其实就是IOCP采用的服务器端模型。即IOCP将创建专用的IO线程,该线程负责于所有客户端进行IO。
? ?
创建完成端口
IOCP中已完成的IO信息将注册到完成端口对象(Completion Port,简称CP对象)。但这个过程需要先经过如下请求过程:当该套接字的IO完成时,请把状态信息注册到指定的CP对象。该过程称为"套接字和CP对象之间的连接请求"
so,首先要做如下两步工作:
- 创建完成端口对象
- 将完成套接字和指定的完成端口对象绑定
? ?
#include <windows.h>
HANDLE CreateIoCompletionPort(HANDLE FileHandle,
HANDLE ExistingCompletionPort, ULONG_PTR CompletionKey,
DWORD NumberOfConcurrentThreads);
->>成功返回cp对象句柄,失败返回NULL
? ?
FileHandle 创建CP对象时传递INVALID_HANDLE_VALUE
ExistingCompletionPort 创建CP对象时传递NULL
CompletionKey 创建CP对象时传递0
NumberOfConcurrentThreads 分配给CP对象用于处理IO的线程数。当该参数为0时,系统中cpu个数作为默认值
? ?
HANDLE hCpObject;
…
hCpObject = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 2);
? ?
连接完成端口和套接字
#include <windows.h>
HANDLE CreateIoCompletionPort(HANDLE FileHandle,
HANDLE ExistingCompletionPort, ULONG_PTR CompletionKey,
DWORD NumberOfConcurrentThreads);
->>成功时返回CP对象句柄,失败时返回NULL
? ?
FileHandle 要连接到CP对象的套接字句柄
ExistingCompletionPort 要连接套接字的CP对象句柄
CompletionKey 传递已完成IO相关信息
NumberOfConcurrentThreads 无论传递何值,只要该函数的第二个参数非NULL就会被忽略
? ?
HANDLE hCpObject;
SOCKET hSock;
…
CreateIoCompletionPort((HANDLE)hSock, hCpObject, (DWORD)ioInfo, 0);
? ?
确认完成端口已完成的IO和线程的IO处理
#include <windows.h>
BOOL GetQueuedCompletionStatus(HANDLE CompletionPort,
LPDWORD lpNumberOfBytes, PULONG_PTR lpCompletionKey,
LPOVERLAPPED * lpOverlapped, DWORD dwMilliseconds);
->> 成功时返回TRUE,失败时返回FALSE
? ?
CompletionPort 注册有已完成IO信息的CP对象句柄
lpNumberOfBytes 用于保存IO过程中传输数据大小的变量地址值
lpCompletionKey 用于保存CreateIoCompletionPort函数的第三个参数值的变量地址值
lpOverlapped 用于保存调用WSASend、WSARecv函数时传递的OVERLAPPED结构体变量的地址
dwMilliseconds 超时信息。超过指定时间后返回FALSE并跳出函数。传递INFINITE时,阻塞。
? ?
其中lpCompletionKey以及lpOverlapped主要时为了获取所需的信息而设置的。
通过GetQueuedCompletionPort函数的第三个参数得到的是以连接套接字和CP对象为目的目的而调用的CreateCompletionPort函数的第三个参数值
通过GetQueueCompletionStatus函数的第四个参数得到的是调用WSASend、WSARecv函数时传入的WSAOVERLAPPED结构体变量地址值。
? ?
IOCP中将创建全职IO线程,有该线程针对所有客户端进行IO。而且CreateIoCompletionPort函数中也有参数用于指定分配给CP对象的最大线程数。
? ?
? ?
#include <stdio.h>
#include <stdlib.h>
#include <process.h>
#include <winsock2.h>
#include <windows.h>
#pragma comment(lib, "ws2_32.lib")
#pragma warning(disable: 4996)
? ?
#define BUF_SIZE 1024
#define READ 3
#define WRITE 5
? ?
typedef struct //socket info
{
SOCKET hClntSock;
SOCKADDR_IN clntAddr;
} PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
? ?
typedef struct // buffer info
{
OVERLAPPED overlapped;
WSABUF wsaBuf;
char buffer[BUF_SIZE];
int rwMode; // READ or WRITE
} PER_IO_DATA, *LPPER_IO_DATA;
? ?
DWORD WINAPI EchoThreadMain(LPVOID CompletionPortIO);
void ErrorHandling(char *message);
? ?
int main(int argc, char* argv[])
{
WSADATA wsaData;
HANDLE hComPort;
SYSTEM_INFO sysInfo;
LPPER_IO_DATA ioInfo;
LPPER_HANDLE_DATA handleInfo;
? ?
SOCKET hServSock;
SOCKADDR_IN servAddr;
int recvBytes, i, flags = 0;
? ?
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
ErrorHandling("WSAStartup() Error");
? ?
hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
? ?
GetSystemInfo(&sysInfo);
for (i = 0; i < sysInfo.dwNumberOfProcessors; i++)
_beginthreadex(NULL, 0, EchoThreadMain, (LPVOID)hComPort, 0, NULL);
? ?
hServSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); // 必须要使用重叠IO套接字
memset(&servAddr, 0, sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
servAddr.sin_port = htons(atoi(argv[1]));
? ?
if (bind(hServSock, (SOCKADDR*)& servAddr, sizeof(servAddr)) != 0)
ErrorHandling("bind() error");;
if(listen(hServSock, 5) != 0)
ErrorHandling("listen() error");
? ?
while (1)
{
SOCKET hClntSock;
SOCKADDR_IN clntAddr;
int addrLen = sizeof(clntAddr);
? ?
hClntSock = accept(hServSock, (SOCKADDR*)&clntAddr, &addrLen);
? ?
handleInfo = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));
handleInfo->hClntSock = hClntSock;
memcpy(&(handleInfo->clntAddr), &clntAddr, addrLen);
? ?
CreateIoCompletionPort((HANDLE)hClntSock, hComPort, (DWORD)handleInfo, 0);
? ?
ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));
memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
ioInfo->wsaBuf.len = BUF_SIZE;
ioInfo->wsaBuf.buf = ioInfo->buffer;
ioInfo->rwMode = READ;
WSARecv(handleInfo->hClntSock, &(ioInfo->wsaBuf), 1,
&recvBytes, &flags, &(ioInfo->overlapped), NULL);
? ?
}
? ?
return 0;
}
? ?
DWORD WINAPI EchoThreadMain(LPVOID pComPort)
{
puts("I am Thread");
HANDLE hComPort = (HANDLE)pComPort;
SOCKET sock;
DWORD bytesTrans;
LPPER_HANDLE_DATA handleInfo;
LPPER_IO_DATA ioInfo;
DWORD flags = 0;
? ?
while (1)
{
GetQueuedCompletionStatus(hComPort, &bytesTrans, (LPDWORD)&handleInfo, (LPOVERLAPPED*)&ioInfo, INFINITE);
sock = handleInfo->hClntSock;
? ?
if (ioInfo->rwMode == READ)
{
puts("message received");
if (bytesTrans == 0) // 传输EOF时
{
closesocket(sock);
free(handleInfo);
free(ioInfo);
continue;
}
? ?
memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
ioInfo->wsaBuf.len = bytesTrans;
ioInfo->rwMode = WRITE;
WSASend(sock, &(ioInfo->wsaBuf), 1, NULL, 0, &(ioInfo->overlapped), NULL);
? ?
ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));
memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
ioInfo->wsaBuf.len = BUF_SIZE;
ioInfo->wsaBuf.buf = ioInfo->buffer;
ioInfo->rwMode = READ;
WSARecv(sock, &(ioInfo->wsaBuf), 1, NULL, &flags, &(ioInfo->overlapped), NULL);
}
else
{
puts("message send!");
free(ioInfo);
}
}
return 0;
}
void ErrorHandling(char * message)
{
fprintf(stderr, "%s, Error Code %d \n", message, WSAGetLastError());
exit(1);
}
? ?
在此段代码中你大概会注意到,在WSARecv之前动态申请了一个PER_IO_DATA的结构体空间。一般逻辑上看来,就算是直接使用一开始的PER_IO_DATA结构体也未尝不可,正常看来应该不会出现什么问题,然而恰恰他就出现了问题。
问题原因暂时还不知道,是的。我琢磨了一下,还是没有想的很明白。但是好像有了一点点的头绪。
? ?
IOCP性能更优的原因
在代码层面上与select进行比较,发现如下特点
- 非阻塞IO,没有IO延迟
- 无需循环查早已完成IO
- 不需管理IO套接字
- 可以选用合适的处理IO线程数