转载请注明来源:http://blog.csdn.net/caoshiying?viewmode=contents
一、回想重叠IO模型
完毕例程内的实现代码比較简单,它取出接收到的数据,然后将数据原封不动的发送给client。最后又一次激活还有一个WSARecv异步操作。注意,在这里用到了“跟随数据”。我们在调用WSARecv的时候,參数lpOverlapped实际上指向一个比它大得多的结构PER_IO_OPERATION_DATA,这个结构除了WSAOVERLAPPED以外。还被我们附加了缓冲区的结构信息,另外还包括client套接字等重要的信息。这样。在完毕例程中通过參数lpOverlapped拿到的不不过WSAOVERLAPPED结构,还有后边跟随的包括client套接字和接收数据缓冲区等重要信息。这种C语言技巧在我介绍完毕port的时候还会使用到。
二、完毕port模型
“完毕port”模型是迄今为止最为复杂的一种I/O模型。
然而,假若一个应用程序同一时候须要管理为数众多的套接字,那么採用这样的模型,往往能够达到最佳的系统性能!
但不幸的是。该模型仅仅适用于Windows NT和Windows 2000操作系统。
因其设计的复杂性,仅仅有在你的应用程序须要同一时候管理数百乃至上千个套接字的时候,并且希望随着系统内安装的CPU数量的增多,应用程序的性能也能够线性提升。才应考虑採用“完毕port”模型。
要记住的一个基本准则是。假如要为Windows NT或Windows 2000开发高性能的server应用。同一时候希望为大量套接字I/O请求提供服务(Webserver便是这方面的典型样例)。那么I/O完毕port模型便是最佳选择!
完毕port模型是我最喜爱的一种模型。尽管事实上现比較复杂(事实上我认为它的实现比用事件通知实现的重叠I/O简单多了)。但其效率是惊人的。
我在T公司的时候以前帮同事写过一个邮件server的性能測试程序,用的就是完毕port模型。
结果表明。完毕port模型在多连接(成千上万)的情况下。只依靠一两个辅助线程。就能够达到很高的吞吐量。
三、关键函数
1、CreateIoCompletionPort
创建一个输入/输出(I / O)完毕port,并将其与一个指定的文件句柄关联。或者创建一个尚未与文件句柄关联的I / O完毕port,同意在稍后的时间关联。将已打开的文件句柄的实例与一个I / O完毕port关联,同意一个进程接收包括该文件句柄的异步I / O操作完毕的通知。注意:这里所使用的术语文件句柄是指代表一个重叠的I / O端点的系统抽象,而不不过磁盘上的一个文件。不论什么系统对象支持重叠I / o-such网络端点,TCP套接字。命名管道、邮件槽能够作为文件句柄。
函数原型:
HANDLE WINAPI CreateIoCompletionPort( _In_ HANDLE FileHandle, _In_opt_ HANDLE ExistingCompletionPort, _In_ ULONG_PTR CompletionKey, _In_ DWORD NumberOfConcurrentThreads );
函数參数:
FileHandle:一个打开的文件句柄或者INVALID_HANDLE_VALUE。这个文件句柄必须是支持重叠IO的object。
假设提供了句柄, 它必须是已经给重叠I/O模型完毕port打开的句柄。比如。假设您使用CreateFile函数获取的句柄,那么您在调用这个函数时必须在參数中指定FILE_FLAG_OVERLAPPED旗标。假设指定 INVALID_HANDLE_VALUE,那么函数将创建一个没有关联文件句柄的IO完毕port模型,此外ExistingCompletionPort參数必须设为NULL,CompletionKey參数将被忽略。
ExistingCompletionPort:是已经存在的完毕port。
假设为NULL。则为新建一个IOCP。
CompletionKey:用户定义的句柄包括的I/O完毕包信息。当FileHandle被设为INVALID_HANDLE_VALUE时此參数被忽略。
NumberOfConcurrentThreads:操作系统能够同意同一时候处理I / O完毕端口的I / O完毕数据包的线程的最大数目。假设existingcompletionport參数不为空,则忽略此參数。假设这个參数为零,系统同意多个并发执行的线程。由于系统中有处理器。
返回值:
假设函数成功,返回值是一个I / O完毕port的句柄:假设ExistingCompletionPort參数为空。返回值是一个新的处理。假设ExistingCompletionPort參数是一个有效的I/O完毕port句柄,返回值是同样的处理。假设文件句柄參数是一个有效的处理,文件处理是如今与返回的I/O完毕port。假设函数失败,返回值为空。为了获得很多其它的错误信息,调用GetLastError函数。
2、GetQueuedCompletionStatus
失望的是微软官方MSDN没有提供关于这个API的说明。下面參照一篇英文文档进行翻译。
文档说这个函数试图将一个I/O完毕包从指定的I/O完毕port。假设没有完毕数据包队列,则函数等待一个挂起的I / O操作与完毕port相关联的完毕。
函数原型:
BOOL WINAPI GetQueuedCompletionStatus( _In_ HANDLE CompletionPort, _Out_ LPDWORD lpNumberOfBytes, _Out_ PULONG_PTR lpCompletionKey, _Out_ LPOVERLAPPED *lpOverlapped, _In_ DWORD dwMilliseconds );
函数參数:
CompletionPort:完毕port的句柄。创建一个完毕port。使用CreateIoCompletionPort函数。
lpNumberOfBytes:指向已完毕的I / O操作期间传输的字节数的变量的指针。
lpCompletionKey:指向与文件句柄关联的完毕键的变量的指针,该键的I / O操作已完毕。一个完毕的关键是每个文件的关键,是指定一个叫CreateIoCompletionPort。
lpOverlapped:一个指向一个变量的指针,该指针指向在已完毕的I / O操作開始时指定的重叠结构的地址的变量。
即使您已经通过了一个与完毕port相关联的文件句柄和一个有效的重叠结构,应用程序也能够防止完毕port通知。这是通过指定的重叠结构的hevent成员有效的事件处理完毕,并设置其低阶位。
一个有效的事件句柄,其低阶位设置将保持I / O完毕从被队列到完毕port。
dwMilliseconds:调用方愿意等待完毕数据包出如今完毕port的毫秒数。
假设一个完毕包没有出如今指定的时间内,功能倍出。返回false。并设置*lpOverlapped为null。
假设该參数是无限的。函数将没有时间了。
假设该參数为零,没有I/O操作中出列,函数将取消等待时间,马上操作。
返回值:
返回非零(真)。假设成功或零(假),否则。为了获得很多其它的错误信息,调用GetLastError。
此功能将一个线程与指定的完毕port关联。
一个线程能够与至多一个完毕port相关联的。假设由于完毕port句柄与它是封闭而调用调用GetQueuedCompletionStatus突出失败。函数返回false。*lpOverlapped会是空的,GetLastError将返回error_abandoned_wait_0。
Windows Server 2003和Windows XP:关闭完毕port句柄,调用优秀不会导致之前的行为。该函数将继续等待直到一项是从港口或直到发生超时删除,假设指定以外的无限价值。
假设GetQueuedCompletionStatus函数调用成功,它出列完毕包一个成功的I/O操作完毕port和存储信息的变量所指向的下列參数:lpNumberOfBytes。lpcompletionkey,和lpOverlapped。在失败(返回值是错误的),这些同样的參数能够包括特定的值组合例如以下:假设*lpOverlapped为空。功能没有出列完毕包从完毕port。在这样的情况下,函数不存储信息在lpNumberOfBytes and lpCompletionKey所指向的參数中,其值是不确定的。
假设*lpOverlapped不空和功能按一个失败的I/O操作的完毕port完毕包的功能。存储信息有关失败操作的变量所指向的lpcompletionkey lpOverlapped lpNumberOfBytes。为了获得很多其它的错误信息。调用GetLastError。
3、PostQueuedCompletionStatus
将一个I / O完毕数据包发送到一个I / O完毕port。I/O完毕包将满足一个优秀的调用GetQueuedCompletionStatus函数。该函数返回三值传递的第二,第三,和第四个參数postqueuedcompletionstatus呼叫。
该系统不使用或验证这些值。特别是。lpOverlapped參数不须要点的重叠结构。
函数原型:
BOOL WINAPI PostQueuedCompletionStatus( _In_ HANDLE CompletionPort, _In_ DWORD dwNumberOfBytesTransferred, _In_ ULONG_PTR dwCompletionKey, _In_opt_ LPOVERLAPPED lpOverlapped );
參数:
CompletionPort:一个I / O完毕数据包的I / O完毕port的句柄。
dwNumberOfBytesTransferred:要通过lpnumberofbytestransferred參数GetQueuedCompletionStatus函数返回的值。0xFFFFFFFF表示处理全部跟随数据。仅仅有准备关闭port的时候才这样做。
dwCompletionKey:能够通过GetQueuedCompletionStatus函数返回的值lpcompletionkey參数。
lpOverlapped:要通过lpOverlapped參数GetQueuedCompletionStatus函数返回的值。
返回值:
假设函数成功。返回值是非零的。假设函数失败,返回值为零。为了获得很多其它的错误信息,调用GetLastError。
四、完整的演示样例程序
#pragma once #define SOCKET_MESSAGE_SIZE 1024 #include <WinSock2.h> #include <common_callback.h> typedef enum { RECV_POSTED }OPERATION_TYPE; typedef struct { WSAOVERLAPPED overlap; WSABUF buffer; char message[SOCKET_MESSAGE_SIZE]; DWORD received_count; DWORD flags; OPERATION_TYPE operation_type; }PEERIO_OPERATION_DATA, *LPPEERIO_OPERATION_DATA; class completeio_server_manager: public iserver_manager { private: int iport; int iaddr_size; common_callback callback; BOOL brunning; SOCKET server; WSADATA wsaData; HANDLE hcomplete_port; SYSTEM_INFO system_info; LPPEERIO_OPERATION_DATA peer_data; bool bdisposed; protected: bool accept_by_crt(); bool accept_by_winapi(); public: void receive(); void shutdown(); void start_receive(); void start_accept(); public: completeio_server_manager(); virtual ~completeio_server_manager(); };
实现文件完整代码例如以下:
#include "completeio_server_manager.h" #include <stdio.h> #include <tchar.h> completeio_server_manager::completeio_server_manager() { iport = 5150; iaddr_size = sizeof(SOCKADDR_IN); brunning = FALSE; GetSystemInfo(&system_info); callback.set_manager(this); callback.set_receive_thread_coount(system_info.dwNumberOfProcessors); hcomplete_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); bdisposed = false; } completeio_server_manager::~completeio_server_manager() { if (bdisposed) shutdown(); } bool completeio_server_manager::accept_by_crt() { return true; } bool completeio_server_manager::accept_by_winapi() { SOCKADDR_IN server_addr; SOCKADDR_IN client_addr; SOCKET client; LPPEERIO_OPERATION_DATA peer_data; int iresult = -1; WSAStartup(MAKEWORD(2, 2), &wsaData); server = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); server_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(iport); do { iresult = bind(server, (struct sockaddr*)&server_addr, iaddr_size); if (iresult == SOCKET_ERROR) { iport++; server_addr.sin_port = htons(iport); } } while (iresult == -1); listen(server, 3); printf("基于完毕端口模型的Socket服务器启动成功。监听端口是:%d\n", iport); while (brunning) { printf("開始监听请求。\n"); client = accept(server, (struct sockaddr*)&client_addr, &iaddr_size); if (client == SOCKET_ERROR) continue; printf("新客户端连接:%s:%d\n", inet_ntoa(client_addr.sin_addr), htons(client_addr.sin_port)); CreateIoCompletionPort((HANDLE)client, hcomplete_port, (DWORD)client, 0); peer_data = (LPPEERIO_OPERATION_DATA)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(PEERIO_OPERATION_DATA)); peer_data->buffer.len = SOCKET_MESSAGE_SIZE; peer_data->buffer.buf = peer_data->message; peer_data->operation_type = RECV_POSTED; printf("開始接收客户端传送数据。\n"); WSARecv(client, &peer_data->buffer, 1, &peer_data->received_count, &peer_data->flags, &peer_data->overlap, NULL); printf("收到客户端数据。\n"); } return true; } void completeio_server_manager::receive() { DWORD dwtransfered = 0; SOCKET client; LPPEERIO_OPERATION_DATA peer = nullptr; while (brunning) { printf("线程:%d,查询端口状态信息。\n",GetCurrentThreadId()); GetQueuedCompletionStatus(hcomplete_port, &dwtransfered, (PULONG_PTR)&client, (LPOVERLAPPED*)&peer, INFINITE); printf("获得端口信息。\n"); if (dwtransfered == 0xFFFFFFFF) return; if (peer->operation_type == RECV_POSTED) { if (dwtransfered == 0) { closesocket(client); printf("有客户端退出了。\n"); HeapFree(GetProcessHeap(), 0, peer); } else { peer->message[dwtransfered] = 0; send(client, peer->message, dwtransfered, 0); memset(peer, 0, sizeof(PEERIO_OPERATION_DATA)); peer->buffer.len = SOCKET_MESSAGE_SIZE; peer->buffer.buf = peer->message; peer->operation_type = RECV_POSTED; WSARecv(client, &peer->buffer, 1, &peer->received_count, &peer->flags, &peer->overlap, nullptr); } } } } void completeio_server_manager::shutdown() { PostQueuedCompletionStatus(hcomplete_port, 0xFFFFFFFF, 0, NULL);//端口跟随数据。
brunning = FALSE; callback.shutdown();//清扫 CloseHandle(hcomplete_port); closesocket(server); WSACleanup(); bdisposed = true; } void completeio_server_manager::start_accept() { brunning = TRUE; bdisposed = false; callback.start_accept_by_winapi(); } void completeio_server_manager::start_receive() { brunning = TRUE; bdisposed = false; callback.start_receive(); } int main() { completeio_server_manager csm; csm.start_accept(); csm.start_receive(); printf("服务器启动成功。按随意键关闭服务器并退出程序。\n"); getchar(); csm.shutdown(); return 0; }
五、效果
六、心得体会
在此要记住的一个重点在于。在我们调用CreateIoCompletionPort时指定的并发线程数量,与打算创建的工作者线程数量相比,它们代表的并不是同一件事情。早些时候。我们曾建议大家用CreateIoCompletionPort函数为每一个处理器都指定一个线程(处理器的数量有多少,便指定多少线程)以避免因为频繁的线程“场景”交换活动,从而影响系统的总体性能。CreateIoCompletionPort函数的NumberOfConcurrentThreads參数明白指示系统:在一个完毕port上,一次仅仅同意n个工作者线程执行。假如在完毕port上创建的工作者线程数量超出n个。那么在同一时刻。最多仅仅同意n个线程执行。
但实际上,在一段较短的时间内,系统有可能超过这个值,但非常快便会把它降低至事先在CreateIoCompletionPort函数中设定的值。那么。为何实际创建的工作者线程数量有时要比CreateIoCompletionPort函数设定的多一些呢?这样做有必要吗?如先前所述。这主要取决于应用程序的整体设计情况。
假定我们的某个工作者线程调用了一个函数,比方Sleep或WaitForSingleObject,但却进入了暂停(锁定或挂起)状态。那么同意还有一个线程取代它的位置。换言之,我们希望随时都能运行尽可能多的线程。当然,最大的线程数量是事先在CreateIoCompletionPort调用里设定好的。
这样一来。假如事先估计到自己的线程有可能临时处于停顿状态,那么最好可以创建比CreateIoCompletionPort的NumberOfConcurrentThreads參数的值多的线程。以便到时候充分发挥系统的潜力。
一旦在完毕port上拥有足够多的工作者线程来为I/O请求提供服务,便可着手将套接字句柄同完毕port关联到一起。这要求我们在一个现有的完毕port上,调用CreateIoCompletionPort函数,同一时候为前三个參数——FileHandle,ExistingCompletionPort和CompletionKey——提供套接字的信息。当中, FileHandle參数指定一个要同完毕port关联在一起的套接字句柄。
ExistingCompletionPort參数指定的是一个现有的完毕port。
CompletionKey(完毕键)參数则指定要与某个特定套接字句柄关联在一起的“单句柄数据”。在这个參数中。应用程序可保存与一个套接字相应的随意类型的信息。之所以把它叫作“单句柄数据”。是因为它仅仅相应着与那个套接字句柄关联在一起的数据。
可将其作为指向一个数据结构的指针,来保存套接字句柄;在那个结构中。同一时候包括了套接字的句柄。以及与那个套接字有关的其它信息。