感谢rulary的指正!博文中我对IOCP的理解是有误的,正确的方式请见评论区rulary的回复!
由于项目实际设计的需要,最终IO事件处理没有采用IOCP,而是采用了NT6.0引入的WSAPoll,其编程模型和linux下poll基本一致,此处就不赘述了!
==================================================
IOCP是windows下IO事件处理的最高效的一种方式了,结合OVERLAPPED IO可以实现真正的完全异步IO。windows在此种模式下提供了一站式服务,只要你提交一个IO请求,接下来windows替你处理其他所有的工作,你只需要等着接受windows的完成通知就行了。
响马大叔在他的孢子社区有了一个帖子再谈select, iocp, epoll,kqueue及各种I/O复用机制对此有比较全面的对比介绍了,故而本文不对IOCP这方面的内容再做赘述了,相反说说自己在自己开发过程中认为IOCP不好的地方。
IOCP不好的地方体现这个地方:一个File/Socket Handle是不能多次调用CreateIoCompletionPort()绑定到不同的IOCP上的,只有第一次是成功的,第二次开始是参数错误失败!因此一旦绑定了一个IOCP就没法迁移到其他的IOCP了,这个是我经过实际的代码测试和分析ReactOS代码实现得出的结论。测试代码如下
int main(int argc, char *argv[])
{
HANDLE iocp;
HANDLE iocp1;
SOCKET s;
HANDLE ret; WSADATA wsa_data;
WSAStartup(MAKEWORD(, ), &wsa_data); iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, , );
iocp1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, , );
s = create_client_socket(); assert(NULL != iocp);
assert(NULL != iocp1); ret = CreateIoCompletionPort((HANDLE)s, iocp, , );
printf("first bind, ret: %lu, error: %u\n", (long)ret, GetLastError()); ret = CreateIoCompletionPort((HANDLE)s, iocp1, , );
printf("second bind, ret: %lu, error: %u\n", (long)ret, GetLastError()); CloseHandle(iocp);
CloseHandle(iocp1);
closesocket(s); WSACleanup(); return ;
}
运行结果
Administrator@attention /e/tinylib/windows/net_iocp
$ iocp.exe
first bind, ret: 60, error: 0
second bind, ret: 0, error: 87
ReactOS-0.3.12-REL-src的代码体现在NtSetInformationFile()中以下代码片段
/* FIXME: Later, we can implement a lot of stuff here and avoid a driver call */
/* Handle IO Completion Port quickly */
if (FileInformationClass == FileCompletionInformation)
{
/* Check if the file object already has a completion port */
if ((FileObject->Flags & FO_SYNCHRONOUS_IO) ||
(FileObject->CompletionContext))
{
/* Fail */
Status = STATUS_INVALID_PARAMETER;
}
else
{
/* Reference the Port */
CompletionInfo = Irp->AssociatedIrp.SystemBuffer;
Status = ObReferenceObjectByHandle(CompletionInfo->Port,
IO_COMPLETION_MODIFY_STATE,
IoCompletionType,
PreviousMode,
(PVOID*)&Queue,
NULL);
if (NT_SUCCESS(Status))
{
/* Allocate the Context */
Context = ExAllocatePoolWithTag(PagedPool,
sizeof(IO_COMPLETION_CONTEXT),
IOC_TAG);
if (Context)
{
/* Set the Data */
Context->Key = CompletionInfo->Key;
Context->Port = Queue;
if (InterlockedCompareExchangePointer((PVOID*)&FileObject->
CompletionContext,
Context,
NULL))
{
/*
* Someone else set the completion port in the
* meanwhile, so dereference the port and fail.
*/
ExFreePool(Context);
ObDereferenceObject(Queue);
Status = STATUS_INVALID_PARAMETER;
}
}
else
{
/* Dereference the Port now */
ObDereferenceObject(Queue);
Status = STATUS_INSUFFICIENT_RESOURCES;
}
}
} /* Set the IRP Status */
Irp->IoStatus.Status = Status;
Irp->IoStatus.Information = ;
}
MSDN中也明确提倡开发者启动多个线程使用GetQueuedCompletionStatus()挂在一个IOCP上来处理IO事件,我是如此理解了的,原文如下
- NumberOfConcurrentThreads
- [in] Maximum number of threads that the operating system allows to concurrently process I/O completion packets for the I/O completion port. If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.
Although any number of threads can call the GetQueuedCompletionStatus function to wait for an I/O completion port, each thread is associated with only one completion port at a time. That port is the port that was last checked by the thread.
可这对应有另外一个问题:会导致同一个IO handle的完成事件被分散到不同的线程中处理,从而在处理同一个handle的IO事件时会引入额外的并发竞争,对此我也写了代码进行测试确认,如下
/*
编译命令
gcc iocp.c -o iocp -lws2_32 -g 测试命令
nc -u 192.168.100.101 1993
快速反复发送数据 实际运行结果
Administrator@attention /e/code
$ gdb -q iocp.exe
Reading symbols from e:\code\iocp.exe...done.
(gdb) r
Starting program: e:\code\iocp.exe
[New Thread 1984.0x1788]
[New Thread 1984.0x914]
thread: 6024, 3 bytes received fro 168 notified by IOCP
thread: 6024, 3 bytes received fro 168 notified by IOCP
thread: 6024, 3 bytes received fro 168 notified by IOCP
thread: 6024, 4 bytes received fro 168 notified by IOCP
thread: 6024, 3 bytes received fro 168 notified by IOCP
thread: 2324, 4 bytes received fro 168 notified by IOCP
thread: 2324, 2 bytes received fro 168 notified by IOCP
thread: 2324, 4 bytes received fro 168 notified by IOCP
thread: 2324, 3 bytes received fro 168 notified by IOCP
thread: 6024, 5 bytes received fro 168 notified by IOCP
thread: 2324, 4 bytes received fro 168 notified by IOCP
thread: 2324, 4 bytes received fro 168 notified by IOCP
thread: 2324, 3 bytes received fro 168 notified by IOCP
thread: 6024, 4 bytes received fro 168 notified by IOCP
thread: 6024, 4 bytes received fro 168 notified by IOCP
thread: 6024, 2 bytes received fro 168 notified by IOCP
*/ #include <stdio.h>
#include <stdlib.h> #define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <winsock2.h>
#include <process.h> HANDLE iocp;
SOCKET s_udp; void routine(void)
{
unsigned threadId; ULONG_PTR key;
LPOVERLAPPED povlp;
BOOL result; char buffer[];
WSABUF wsabuf;
DWORD received;
DWORD flag;
struct sockaddr_in peer_addr;
int addr_len;
WSAOVERLAPPED ovlp;
int error; do
{
wsabuf.len = sizeof(buffer);
wsabuf.buf = buffer;
received = ;
flag = ;
addr_len = sizeof(peer_addr);
memset(&peer_addr, , addr_len);
memset(&ovlp, , sizeof(ovlp)); threadId = GetCurrentThreadId(); if (WSARecvFrom(s_udp, &wsabuf, , &received, &flag, (struct sockaddr*)&peer_addr, &addr_len, &ovlp, NULL) == )
{
printf("thread: %u, %u bytes received for %lu imediately\n", threadId, received, s_udp);
continue;
} while ()
{
result = GetQueuedCompletionStatus(iocp, &received, &key, &povlp, );
if (FALSE == result)
{
error = WSAGetLastError();
if (WAIT_TIMEOUT != error)
{
printf("GetQueuedCompletionStatus() failed, error: %d\n", error);
}
continue;
} printf("thread: %u, %u bytes received fro %lu notified by IOCP\n", threadId, received, s_udp);
break;
}
} while (); return;
} unsigned __stdcall thread(void *arg)
{
routine(); return ;
} SOCKET create_udp_socket(unsigned short port, const char *ip)
{
SOCKET fd;
struct sockaddr_in addr;
unsigned long value = ; fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, , WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == fd)
{
printf("create_udp_socket: socket() failed, errno: %d", WSAGetLastError());
return INVALID_SOCKET;
} memset(&addr, , sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = (NULL != ip ? inet_addr(ip) : INADDR_ANY);
addr.sin_port = htons(port);
if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) != )
{
printf("create_server_socket: bind() failed, erron: %d", WSAGetLastError());
closesocket(fd);
return INVALID_SOCKET;
} return fd;
} int main(int argc, char *argv[])
{
unsigned threadId;
HANDLE t;
WSADATA wsadata; WSAStartup(MAKEWORD(,), &wsadata); iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, , );
s_udp = create_udp_socket(, "0.0.0.0");
CreateIoCompletionPort((HANDLE)s_udp, iocp, , ); t = (HANDLE)_beginthreadex(NULL, , thread, NULL, , &threadId); routine(); WaitForSingleObject(t, INFINITE);
CloseHandle(t);
closesocket(s_udp);
CloseHandle(iocp); WSACleanup(); return ;
}
如此的话,由于这些并发竞争的存在实际上差不多抵消了开多个线程进行并发处理的好处,还不如将所有的IO事件全部放在同一个线程中进行处理,还能省去很多锁的开销。不过现代的程序几乎完全是在多核的CPU上运行的,如果因为IOCP,你让所有相关的工作全部放在一个线程里进行处理,又不能充分利用多核的并行优势。实际上我们在设计并发模型时,经常开多个worker来实现负载均衡,但IOCP以上的限制是与之相冲突的。
linux下的epoll就额外提供了del操作,可以使得一个fd可以随时从当期的epoll中detach出去,又立马add进另外一个epoll,如此的话就可以开多个worker线程开跑多个epoll,可以将不同fd均摊到不同的worker中实现负载均衡,同时又可以随意的将fd从一个线程迁移到另外一个线程进行处理。这种均衡操作在实际的业务中是很常见的,会需要你根据业务逻辑,将不同的fd交给其他的线程来处理,若使用IOCP的话就不太方便了。
这些就算是我对IOCP吐槽的一个地方了。
~~end~~
===== 分割线 =====
看来我的意思还说得不很清楚,补充一下:
本意也是多个线程跑多个处理循环,在每个循环里都拥有一个IOCP,处理不同的socket,但业务逻辑需要将一个socket从一个线程的处理循环中迁移到另外的一个线程的处理循环,但上面所述的IOCP的限制,没法绑定到新线程的IOCP中,从而没法进行迁移!
但是开多个线程挂在同一个IOCP上,又有上面所说的并发竞争的问题!
===== 分割线 =====
得再补充一些情况:
实际业务情况是这样的:我们这边有两个不同的服务,但奈何由于一些我们不能自主的原因,两个服务的请求只能从一个端口进来。来了一个连接之后,得先接收一小段数据才能知道到底请求哪个,但这两个服务是在不同的线程循环里实现的。所以额外有一个入口server线程来负责接受请求,并收取这小段数据。若使用IOCP,那么接收到的连接就得先绑定到该入口server的IOCP里头了,但一旦绑定就没法迁移出去了,但实际后续两个服务又需要在各自的循环里进一步在接受到的连接上进行数据收发处理。
本身开始设计实现的时候自然是想到各自拥有一个IOCP各自处理的不同的连接,不做迁移,但实际却由于这些原因产生了迁移需求。此乃谓之蛋疼也!