标签:
IOCP是windows下IO事件处理的最高效的一种方式了,结合OVERLAPPED IO可以实现真正的完全异步IO。windows在此种模式下提供了一站式服务,只要你提交一个IO请求,接下来windows替你处理其他所有的工作,你只需要等着接受windows的完成通知就行了。
响马大叔在他的孢子社区有了一个帖子再谈select, iocp, epoll,kqueue及各种I/O复用机制对此有比较全面的对比介绍了,故而本文不对IOCP这方面的内容再做赘述了,相反说说自己在自己开发过程中认为IOCP不好的地方。
IOCP不好的地方体现这个地方:一个File/Socket Handle是不能多次调用CreateIoCompletionPort()绑定到不同的IOCP上的,只有第一次是成功的,第二次开始是参数错误失败!因此一旦绑定了一个IOCP就没法迁移到其他的IOCP了,这个是我经过实际的代码测试和分析ReactOS代码实现得出的结论。测试代码如下
1 int main(int argc, char *argv[]) 2 { 3 HANDLE iocp; 4 HANDLE iocp1; 5 SOCKET s; 6 HANDLE ret; 7 8 WSADATA wsa_data; 9 WSAStartup(MAKEWORD(2, 2), &wsa_data); 10 11 iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4); 12 iocp1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4); 13 s = create_client_socket(); 14 15 assert(NULL != iocp); 16 assert(NULL != iocp1); 17 18 ret = CreateIoCompletionPort((HANDLE)s, iocp, 0, 0); 19 printf("first bind, ret: %lu, error: %u\n", (long)ret, GetLastError()); 20 21 ret = CreateIoCompletionPort((HANDLE)s, iocp1, 0, 0); 22 printf("second bind, ret: %lu, error: %u\n", (long)ret, GetLastError()); 23 24 CloseHandle(iocp); 25 CloseHandle(iocp1); 26 closesocket(s); 27 28 WSACleanup(); 29 30 return 0; 31 }
运行结果
Administrator@attention /e/tinylib/windows/net_iocp
$ iocp.exe
first bind, ret: 60, error: 0
second bind, ret: 0, error: 87
ReactOS-0.3.12-REL-src的代码体现在NtSetInformationFile()中以下代码片段
1 /* FIXME: Later, we can implement a lot of stuff here and avoid a driver call */ 2 /* Handle IO Completion Port quickly */ 3 if (FileInformationClass == FileCompletionInformation) 4 { 5 /* Check if the file object already has a completion port */ 6 if ((FileObject->Flags & FO_SYNCHRONOUS_IO) || 7 (FileObject->CompletionContext)) 8 { 9 /* Fail */ 10 Status = STATUS_INVALID_PARAMETER; 11 } 12 else 13 { 14 /* Reference the Port */ 15 CompletionInfo = Irp->AssociatedIrp.SystemBuffer; 16 Status = ObReferenceObjectByHandle(CompletionInfo->Port, 17 IO_COMPLETION_MODIFY_STATE, 18 IoCompletionType, 19 PreviousMode, 20 (PVOID*)&Queue, 21 NULL); 22 if (NT_SUCCESS(Status)) 23 { 24 /* Allocate the Context */ 25 Context = ExAllocatePoolWithTag(PagedPool, 26 sizeof(IO_COMPLETION_CONTEXT), 27 IOC_TAG); 28 if (Context) 29 { 30 /* Set the Data */ 31 Context->Key = CompletionInfo->Key; 32 Context->Port = Queue; 33 if (InterlockedCompareExchangePointer((PVOID*)&FileObject-> 34 CompletionContext, 35 Context, 36 NULL)) 37 { 38 /* 39 * Someone else set the completion port in the 40 * meanwhile, so dereference the port and fail. 41 */ 42 ExFreePool(Context); 43 ObDereferenceObject(Queue); 44 Status = STATUS_INVALID_PARAMETER; 45 } 46 } 47 else 48 { 49 /* Dereference the Port now */ 50 ObDereferenceObject(Queue); 51 Status = STATUS_INSUFFICIENT_RESOURCES; 52 } 53 } 54 } 55 56 /* Set the IRP Status */ 57 Irp->IoStatus.Status = Status; 58 Irp->IoStatus.Information = 0; 59 }
MSDN中也明确提倡开发者启动多个线程使用GetQueuedCompletionStatus()挂在一个IOCP上来处理IO事件,我是如此理解了的,原文如下
Although any number of threads can call the GetQueuedCompletionStatus function to wait for an I/O completion port, each thread is associated with only one completion port at a time. That port is the port that was last checked by the thread.
可这对应有另外一个问题:会导致同一个IO handle的完成事件被分散到不同的线程中处理,从而在处理同一个handle的IO事件时会引入额外的并发竞争,对此我也写了代码进行测试确认,如下
1 /* 2 编译命令 3 gcc iocp.c -o iocp -lws2_32 -g 4 5 测试命令 6 nc -u 192.168.100.101 1993 7 快速反复发送数据 8 9 实际运行结果 10 Administrator@attention /e/code 11 $ gdb -q iocp.exe 12 Reading symbols from e:\code\iocp.exe...done. 13 (gdb) r 14 Starting program: e:\code\iocp.exe 15 [New Thread 5252.0x1330] 16 [New Thread 5252.0xcf0] 17 thread: 3312, 5 bytes received for 168 notified by IOCP 18 thread: 4912, 3 bytes received for 168 notified by IOCP 19 thread: 3312, 6 bytes received for 168 notified by IOCP 20 thread: 4912, 5 bytes received for 168 notified by IOCP 21 thread: 4912, 2 bytes received for 168 notified by IOCP 22 thread: 4912, 3 bytes received for 168 notified by IOCP 23 thread: 3312, 4 bytes received for 168 notified by IOCP 24 */ 25 26 #include <stdio.h> 27 #include <stdlib.h> 28 29 #define WIN32_LEAN_AND_MEAN 30 #include <windows.h> 31 #include <winsock2.h> 32 #include <process.h> 33 34 HANDLE iocp; 35 SOCKET s_udp; 36 37 void routine(void) 38 { 39 unsigned threadId; 40 41 ULONG_PTR key; 42 LPOVERLAPPED povlp; 43 BOOL result; 44 45 char buffer[65535]; 46 WSABUF wsabuf; 47 DWORD received; 48 DWORD flag; 49 struct sockaddr_in peer_addr; 50 int addr_len; 51 WSAOVERLAPPED ovlp; 52 int error; 53 54 while (1) 55 { 56 wsabuf.len = sizeof(buffer); 57 wsabuf.buf = buffer; 58 received = 0; 59 flag = 0; 60 addr_len = sizeof(peer_addr); 61 memset(&peer_addr, 0, addr_len); 62 memset(&ovlp, 0, sizeof(ovlp)); 63 64 threadId = GetCurrentThreadId(); 65 66 if (WSARecvFrom(s_udp, &wsabuf, 1, &received, &flag, (struct sockaddr*)&peer_addr, &addr_len, &ovlp, NULL) == 0) 67 { 68 printf("thread: %u, %u bytes received for %lu imediately\n", threadId, received, s_udp); 69 continue; 70 } 71 72 result = GetQueuedCompletionStatus(iocp, &received, &key, &povlp, 10); 73 if (FALSE == result) 74 { 75 error = WSAGetLastError(); 76 if (WAIT_TIMEOUT != error) 77 { 78 printf("GetQueuedCompletionStatus() failed, error: %d\n", error); 79 } 80 continue; 81 } 82 83 printf("thread: %u, %u bytes received fro %lu notified by IOCP\n", threadId, received, s_udp); 84 } 85 86 return; 87 } 88 89 unsigned __stdcall thread(void *arg) 90 { 91 routine(); 92 93 return 0; 94 } 95 96 SOCKET create_udp_socket(unsigned short port, const char *ip) 97 { 98 SOCKET fd; 99 struct sockaddr_in addr; 100 unsigned long value = 1; 101 102 fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED); 103 if (INVALID_SOCKET == fd) 104 { 105 printf("create_udp_socket: socket() failed, errno: %d", WSAGetLastError()); 106 return INVALID_SOCKET; 107 } 108 109 memset(&addr, 0, sizeof(addr)); 110 addr.sin_family = AF_INET; 111 addr.sin_addr.s_addr = (NULL != ip ? inet_addr(ip) : INADDR_ANY); 112 addr.sin_port = htons(port); 113 if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) 114 { 115 printf("create_server_socket: bind() failed, erron: %d", WSAGetLastError()); 116 closesocket(fd); 117 return INVALID_SOCKET; 118 } 119 120 return fd; 121 } 122 123 int main(int argc, char *argv[]) 124 { 125 unsigned threadId; 126 HANDLE t; 127 WSADATA wsadata; 128 129 WSAStartup(MAKEWORD(2,2), &wsadata); 130 131 iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4); 132 s_udp = create_udp_socket(1993, "0.0.0.0"); 133 CreateIoCompletionPort((HANDLE)s_udp, iocp, 0, 0); 134 135 t = (HANDLE)_beginthreadex(NULL, 0, thread, NULL, 0, &threadId); 136 137 routine(); 138 139 WaitForSingleObject(t, INFINITE); 140 CloseHandle(t); 141 closesocket(s_udp); 142 CloseHandle(iocp); 143 144 WSACleanup(); 145 146 return 0; 147 }
如此的话,由于这些并发竞争的存在实际上差不多抵消了开多个线程进行并发处理的好处,还不如将所有的IO事件全部放在同一个线程中进行处理,还能省去很多锁的开销。不过现代的程序几乎完全是在多核的CPU上运行的,如果因为IOCP,你让所有相关的工作全部放在一个线程里进行处理,又不能充分利用多核的并行优势。实际上我们在设计并发模型时,经常开多个worker来实现负载均衡,但IOCP以上的限制是与之相冲突的。
linux下的epoll就额外提供了del操作,可以使得一个fd可以随时从当期的epoll中detach出去,又立马add进另外一个epoll,如此的话就可以开多个worker线程开跑多个epoll,从而可以将不同fd均摊到不同的worker中实现负载均衡。这种均衡操作在实际的业务中是很常见的,会需要你根据业务逻辑,将不同的fd交给其他的线程来处理,若使用IOCP的话就不太方便了。
这些就算是我对IOCP吐槽的一个地方了。
~~end~~
标签:
原文地址:http://www.cnblogs.com/lanyuliuyun/p/4215568.html