标签:
#pragma once
#include <WinSock2.h>
#include <MSWSock.h>
#include <Windows.h>
#pragma comment(lib,"ws2_32.lib")
#define BUFFER_SIZE (1024*8) // 8KB
#define BUFFER_SIZE_DATA (3*BUFFER_SIZE )
#define NOTIFY_MSG_ACCEPT 0xa1
#define NOTIFY_MSG_CONNECT 0xa2
#define NOTIFY_MSG_DISCONNECT 0xa3
#define NOTIFY_MSG_READ 0xa4
#define NOTIFY_MSG_WRITE 0xa5
struct PER_IO_BUFFER
{
WSAOVERLAPPED ol;
SOCKET sClient ; //the socket of client use by AcceptEx
LPBYTE lpBuffer; // the pointer of buffer
DWORD dwBufferSize; // the size of buffer
DWORD dwTrans ; // the size of io-trans
BYTE opType; // opteion type
#define OP_ACCEPT 6
#define OP_CONNECT 7
#define OP_WRITE 8
#define OP_READ 9
PER_IO_BUFFER* pNext; // next buffer
};
struct PER_HANDLE_DATA
{
SOCKET s;
SOCKADDR_IN saddr;
BOOL bConnect;
BYTE readBytes[BUFFER_SIZE_DATA];
DWORD dwBufferOffSet;
HANDLE m_hWriteComplete ;
//DWORD dwBufferSize;
PER_HANDLE_DATA* pNext;
};
typedef void (__stdcall* PNOTIFYPROC)(PER_HANDLE_DATA*,PER_IO_BUFFER* ,DWORD );
class Ciocp
{
private:
HANDLE m_hIocp; // iocp handle
SOCKET m_sListen; // the socket of listen
SOCKET m_sConnect; // the socket of connect
DWORD m_dwProt; // the port of listen
DWORD m_dwMaxConns; // the max count of connectios
DWORD m_dwMaxFreeBuffers; // the max count of freebuffers
DWORD m_dwMaxFreeContexts;// the max count of freecontexts
DWORD m_dwInitOp; // the count of init-op
ULONGLONG m_ulWriteBytes; // the total of write-bytes
ULONGLONG m_ulReadBytes; // the total of read-bytes
DWORD m_dwWorkThreadCount ; // the count of worker thread
DWORD m_dwCurWorkThreadCount ;
CRITICAL_SECTION m_csWorkLock ;
CRITICAL_SECTION m_csIoLock ;
CRITICAL_SECTION m_csBuffersListLock; // the cs-lock of free-buffers-list
CRITICAL_SECTION m_csContextsListLock; // the cs-lock of free-count
DWORD m_dwBuffersCount; // the count of cur buffers
DWORD m_dwContextsCount; // the count of cur contexts
PER_IO_BUFFER* m_pFreeBuffersList; // the list of free-buffers
PER_HANDLE_DATA* m_pFreeContextsList; // the list of free-contexts
SOCKADDR_IN m_siRemoteAddr ;
bool m_bStarted ; // the status of socket server
LPFN_ACCEPTEX m_lpfnAcceptEx ;
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;
LPFN_CONNECTEX m_lpfnConnectEx ;
PNOTIFYPROC m_pNotifyProc;
BOOL PostAccept();
BOOL PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize);
BOOL PostRead(PER_HANDLE_DATA* pContext);
BOOL PostConnect();
static unsigned int WINAPI _WorkerThreadProc(LPVOID lpParam);
PER_IO_BUFFER* AllocBuffer(DWORD dwSize = BUFFER_SIZE);
PER_HANDLE_DATA* AllocContext(SOCKET s);
void ReleaseContext(PER_HANDLE_DATA* pContext);
void ReleaseIoBuffer(PER_IO_BUFFER* pBuffer);
void CreateWokerThreads();
void CreateIocp();
void AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext=0);
void DecCurWorkCount();
void HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans);
void NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg);
void ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
BOOL SetKeepAlive(SOCKET s);
void ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
void ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
public:
Ciocp(void);
~Ciocp(void);
BOOL Start(PNOTIFYPROC pNotifyProc, DWORD dwPort=8080,DWORD dwMaxConns= 2000,DWORD dwMaxFreeBuffers = 100,DWORD dwMaxFreeContexts =100 ,DWORD dwInitOp = 5 );
void Shutdown();
BOOL Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp = "127.0.0.1",DWORD dwPort=443 );
void GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite);
void Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize);
void ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
};
#include "StdAfx.h"
#include "iocp.h"
#include <process.h>
#include <mstcpip.h>
Ciocp::Ciocp(void)
{
m_hIocp = INVALID_HANDLE_VALUE ;
m_sListen = INVALID_SOCKET ;
m_sConnect = INVALID_SOCKET ;
m_dwInitOp =5;
m_dwMaxConns = 2000;
m_dwMaxFreeContexts = 100;
m_dwMaxFreeBuffers = 100 ;
m_ulWriteBytes =0;
m_ulReadBytes = 0;
// get max worker count
SYSTEM_INFO sys_info ;
GetSystemInfo(&sys_info);
m_dwWorkThreadCount = sys_info.dwNumberOfProcessors *2 ;
m_dwCurWorkThreadCount = 0;
InitializeCriticalSection(& m_csBuffersListLock);
InitializeCriticalSection(& m_csContextsListLock);
InitializeCriticalSection(& m_csWorkLock);
InitializeCriticalSection((& m_csIoLock));
m_dwBuffersCount = 0;
m_dwContextsCount = 0;
m_pFreeBuffersList = NULL ;
m_pFreeContextsList = NULL ;
m_lpfnAcceptEx = NULL ;
m_lpfnConnectEx = NULL ;
m_lpfnGetAcceptExSockAddrs = NULL ;
m_bStarted = false ;
m_dwProt = 0;
WSADATA wsaData ;
WORD sockVerSion = MAKEWORD(2,2);
WSAStartup(sockVerSion,&wsaData);
}
Ciocp::~Ciocp(void)
{
DeleteCriticalSection(& m_csBuffersListLock);
DeleteCriticalSection(& m_csContextsListLock);
DeleteCriticalSection(& m_csWorkLock);
DeleteCriticalSection(& m_csIoLock);
if (m_sListen != INVALID_SOCKET)
{
closesocket(m_sListen);
}
if (m_sConnect != INVALID_SOCKET)
{
closesocket(m_sConnect);
}
}
BOOL Ciocp::Start(PNOTIFYPROC pNotifyProc, DWORD dwPort/*=8080*/,DWORD dwMaxConns/*= 2000*/,DWORD dwMaxFreeBuffers /*= 100*/,DWORD dwMaxFreeContexts /*=100 */,DWORD dwInitOp /*= 5 */)
{
m_pNotifyProc = pNotifyProc ;
m_dwProt = dwPort;
m_dwMaxConns = dwMaxConns;
m_dwMaxFreeBuffers = dwMaxFreeBuffers;
m_dwMaxFreeContexts = dwMaxFreeContexts;
m_dwInitOp = dwInitOp ;
m_sListen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
SOCKADDR_IN saddr ;
saddr.sin_family = AF_INET;
saddr.sin_port = ntohs(m_dwProt);
saddr.sin_addr.S_un.S_addr = INADDR_ANY ;
m_bStarted = true ;
if (bind(m_sListen,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
{
m_bStarted = false ;
return FALSE ;
}
listen(m_sListen,m_dwMaxConns);
CreateIocp();
GUID guidAcceptEx = WSAID_ACCEPTEX ;
DWORD dwBytes ;
WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidAcceptEx,sizeof(guidAcceptEx),&m_lpfnAcceptEx,sizeof(m_lpfnAcceptEx),&dwBytes,NULL,NULL);
GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS ;
WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidGetAcceptExSockAddrs,sizeof(guidGetAcceptExSockAddrs),&m_lpfnGetAcceptExSockAddrs,sizeof(m_lpfnGetAcceptExSockAddrs),&dwBytes,NULL,NULL);
AddSocketToIocp(m_sListen);
//create worker
CreateWokerThreads();
// post accept
for(int i=0;i < m_dwInitOp;i++)
{
PostAccept();
}
}
BOOL Ciocp::PostAccept()
{
// set io type
PER_IO_BUFFER* pBuffer = NULL ;
pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE);
pBuffer->opType = OP_ACCEPT ;
// post io
DWORD dwBytes ;
DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ;
pBuffer->sClient = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
// BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
// pBuffer->lpBuffer,pBuffer->dwBufferSize-dwAddrSize*2
// ,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
pBuffer->lpBuffer,0
,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
if (!b && WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE ;
}
return TRUE;
}
BOOL Ciocp::PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize)
{
PER_IO_BUFFER* pBuffer = AllocBuffer();
pBuffer->opType = OP_WRITE;
// post i/o
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = (char*)lpBuffer;
buf.len = dwSize ;
if(::WSASend(pContext->s, &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
}
}
return TRUE ;
}
BOOL Ciocp::PostRead(PER_HANDLE_DATA* pContext)
{
PER_IO_BUFFER* pBuffer = AllocBuffer();
pBuffer->opType = OP_READ;
// post i/o
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = (char*)pBuffer->lpBuffer;
buf.len = pBuffer->dwBufferSize ;
if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
}
}
return TRUE ;
}
BOOL Ciocp::PostConnect()
{
PER_IO_BUFFER* pBuffer = NULL ;
pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE);
SOCKADDR_IN saddr ;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(0);
saddr.sin_addr.s_addr = htonl(ADDR_ANY);
if (m_sConnect != INVALID_SOCKET)
{
closesocket(m_sConnect);
m_sConnect = INVALID_SOCKET ;
}
m_sConnect = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
if (bind(m_sConnect,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
{
m_bStarted = false ;
return FALSE ;
}
PER_HANDLE_DATA *pContext = AllocContext(m_sConnect);
pContext->bConnect = TRUE ;
AddSocketToIocp(m_sConnect,pContext);
//create worker
if (m_lpfnConnectEx == NULL )
{
GUID guidConnectex = WSAID_CONNECTEX ;
DWORD dwBytes ;
WSAIoctl(m_sConnect,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidConnectex,sizeof(guidConnectex),&m_lpfnConnectEx,sizeof(m_lpfnConnectEx),&dwBytes,NULL,NULL);
}
DWORD dwSend = 0;
pBuffer->opType = OP_CONNECT ;
strcpy((LPSTR)pBuffer->lpBuffer,"hello kid");
pBuffer->dwBufferSize = 5;
bool b = m_lpfnConnectEx(m_sConnect,(SOCKADDR*)&m_siRemoteAddr,sizeof(m_siRemoteAddr),
pBuffer->lpBuffer,pBuffer->dwBufferSize,&dwSend,& pBuffer->ol);
if(!b && ::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
}
return TRUE;
}
PER_IO_BUFFER* Ciocp::AllocBuffer(DWORD dwSize)
{
OutputDebugString(L"Buffer ++ \r\n ");
PER_IO_BUFFER* pBuffer = NULL;
if (dwSize > BUFFER_SIZE)
{
return NULL ;
}
EnterCriticalSection(&m_csBuffersListLock);
if (m_pFreeBuffersList == NULL )
{
// 2) HeapAlloc buffer
pBuffer = (PER_IO_BUFFER*) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_IO_BUFFER)+ BUFFER_SIZE );
}else
{
// 1) check pFreeBuffersList
pBuffer = m_pFreeBuffersList ;
m_pFreeBuffersList = m_pFreeBuffersList->pNext ;
pBuffer->pNext = 0;
m_dwBuffersCount -- ;
}
LeaveCriticalSection(&m_csBuffersListLock);
//
if (pBuffer!= NULL )
{
pBuffer->dwBufferSize = dwSize ;
pBuffer->lpBuffer =(LPBYTE) (pBuffer+1 );
}
return pBuffer ;
}
unsigned int WINAPI Ciocp::_WorkerThreadProc(LPVOID lpParam)
{
Sleep(1000);
Ciocp* pThis = (Ciocp*)lpParam ;
DWORD dwTrans = 0;
DWORD dwKey = 0;
LPOVERLAPPED lpol;
PER_IO_BUFFER* pBuffer = NULL ;
while (pThis->m_bStarted )
{
BOOL bOk = GetQueuedCompletionStatus(pThis->m_hIocp,&dwTrans,&dwKey,&lpol,WSA_INFINITE);
if (dwTrans == -1 )
{
pThis->DecCurWorkCount();
_endthreadex(0);
return 0;
}
pBuffer = CONTAINING_RECORD(lpol,PER_IO_BUFFER,ol);
if (!bOk)
{
if (pBuffer->opType == OP_CONNECT)
{
DWORD dwError = WSAGetLastError();
if (dwError != ERROR_IO_PENDING)
{
pThis->PostConnect();
pThis->ReleaseContext((PER_HANDLE_DATA*)dwKey);
}else
{
OutputDebugString(L"Connect Pending .... ");
}
}// (pBuffer->opType == OP_CONNECT)
pThis->ReleaseIoBuffer(pBuffer);
}else //(!bOk)
{
pThis->HandleIoOp((PER_HANDLE_DATA*)dwKey,pBuffer,dwTrans);
}
}
pThis->DecCurWorkCount();
return WSAGetLastError();
}
void Ciocp::CreateWokerThreads()
{
for (int i= 0;i< m_dwWorkThreadCount-m_dwCurWorkThreadCount ;i++)
{
unsigned threadid;
_beginthreadex(NULL,0,_WorkerThreadProc,this,0,&threadid);
m_dwCurWorkThreadCount ++ ;
}
}
void Ciocp::CreateIocp()
{
if (m_hIocp == INVALID_HANDLE_VALUE)
{
m_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
}
}
void Ciocp::AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext/*=0*/)
{
CreateIoCompletionPort((HANDLE)s,m_hIocp,(DWORD)pContext,0);
}
BOOL Ciocp::Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp /*= "127.0.0.1"*/,DWORD dwPort/*=443 */)
{
m_pNotifyProc = pNotifyProc ;
m_siRemoteAddr.sin_family = AF_INET;
m_siRemoteAddr.sin_port = htons(dwPort);
m_siRemoteAddr.sin_addr.S_un.S_addr = inet_addr(lpstrIp);
//listen(m_sListen,m_dwMaxConns);
CreateIocp();
m_bStarted = true ;
CreateWokerThreads();
// post accept
return PostConnect();
}
PER_HANDLE_DATA* Ciocp::AllocContext(SOCKET s)
{
PER_HANDLE_DATA *pContext = NULL ;
OutputDebugString(L"Context ++ \r\n");
EnterCriticalSection(&m_csContextsListLock);
if (m_pFreeContextsList== NULL )
{
pContext = (PER_HANDLE_DATA*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_HANDLE_DATA));
}else
{
pContext = m_pFreeContextsList ;
m_pFreeContextsList = pContext->pNext ;
pContext->pNext = 0;
m_dwContextsCount -- ;
}
LeaveCriticalSection(& m_csContextsListLock);
if (pContext != NULL)
{
pContext->s = s ;
pContext->m_hWriteComplete = CreateEvent(NULL,true,TRUE,NULL);
}
return pContext ;
}
void Ciocp::DecCurWorkCount()
{
EnterCriticalSection(&m_csWorkLock);
m_dwCurWorkThreadCount -- ;
LeaveCriticalSection(&m_csWorkLock);
}
void Ciocp::HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans)
{
EnterCriticalSection(&m_csIoLock);
switch(pBuffer->opType)
{
case OP_ACCEPT:
//OutputDebugString(L"Accept \r\n");
ProcessIoAccept(pContext,pBuffer,dwTrans);
break;
case OP_CONNECT:
ProcessIoConnect(pContext,pBuffer,dwTrans);
break;
case OP_WRITE:
//OutputDebugString(L"Write \r\n");
ProcessIoWrite(pContext,pBuffer,dwTrans);
break;
case OP_READ:
//OutputDebugString(L"Read \r\n");
ProcessIoRead(pContext,pBuffer,dwTrans);
break;
default:
OutputDebugString(L"HandleIoOp Default... \r\n");
break;
}
// release pBuffer
//ReleaseIoBuffer(pBuffer);
LeaveCriticalSection(&m_csIoLock);
}
void Ciocp::ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
if (dwTrans ==0 ) // read error
{
NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);
if (pContext->bConnect) // is connect
{
PostConnect();
}else
{
}
ReleaseContext(pContext);
}else
{
// read ok notify main thread
pBuffer->dwTrans = dwTrans ;
if (pContext->dwBufferOffSet+pBuffer->dwTrans > BUFFER_SIZE_DATA )
{
closesocket(pContext->s);
if (pContext->bConnect)
{
PostConnect();
}
return ;
}
memcpy(pContext->readBytes+pContext->dwBufferOffSet,pBuffer->lpBuffer,pBuffer->dwTrans);
pContext->dwBufferOffSet+= pBuffer->dwTrans ;
NotifyMsg(pContext,pBuffer,NOTIFY_MSG_READ);
PostRead(pContext);
}
ReleaseIoBuffer(pBuffer);
}
void Ciocp::NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg)
{
if (m_pNotifyProc == NULL )
{
OutputDebugString(L"NotifyMsg m_pNotifyProc is NUll \r\n");
}
if(!IsBadCodePtr((FARPROC)m_pNotifyProc))
{
m_pNotifyProc(pContext,pBuffer,dwMsg);
}else
{
OutputDebugString(L" m_pNotifyProc is badcodeptr \r\n");
}
}
void Ciocp::ReleaseContext(PER_HANDLE_DATA* pContext)
{
OutputDebugString(L"Context -- \r\n");
EnterCriticalSection(& m_csContextsListLock);
CloseHandle(pContext->m_hWriteComplete);
ZeroMemory(pContext,sizeof(PER_HANDLE_DATA));
if (m_dwContextsCount > m_dwMaxFreeContexts)
{
HeapFree(GetProcessHeap(),0,pContext);
LeaveCriticalSection(& m_csContextsListLock);
return ;
}else
{
pContext->pNext = m_pFreeContextsList;
m_pFreeContextsList = pContext ;
m_dwContextsCount ++ ;
}
LeaveCriticalSection(& m_csContextsListLock);
}
void Ciocp::ReleaseIoBuffer(PER_IO_BUFFER* pBuffer)
{
OutputDebugString(L"Buffer -- \r\n");
EnterCriticalSection(& m_csBuffersListLock);
ZeroMemory(pBuffer,sizeof(PER_IO_BUFFER)+pBuffer->dwBufferSize);
if (m_dwBuffersCount > m_dwMaxFreeBuffers)
{
HeapFree(GetProcessHeap(),0,pBuffer);
LeaveCriticalSection(& m_csBuffersListLock);
return ;
}else
{
pBuffer->pNext = m_pFreeBuffersList ;
m_pFreeBuffersList =pBuffer ;
m_dwBuffersCount ++ ;
}
LeaveCriticalSection(& m_csBuffersListLock);
}
void Ciocp::ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
if (dwTrans == 0)
{
NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);
ReleaseContext(pContext);
}else
{
int nAddrLen = sizeof(SOCKADDR_IN);
getsockname(pContext->s,(SOCKADDR*)&pContext->saddr,&nAddrLen);
SetKeepAlive(pContext->s);
NotifyMsg(pContext,pBuffer,NOTIFY_MSG_CONNECT);
//OutputDebugString(L"Connect \r\n");
PostRead(pContext);
}
ReleaseIoBuffer(pBuffer);
}
void Ciocp::Shutdown()
{
m_bStarted = false ;
if (m_sListen != INVALID_SOCKET )
{
closesocket(m_sListen);
m_sListen = INVALID_SOCKET ;
}
if (m_sConnect != INVALID_SOCKET)
{
closesocket(m_sConnect);
m_sConnect = INVALID_SOCKET ;
}
while (m_dwCurWorkThreadCount > 0)
{
::PostQueuedCompletionStatus(m_hIocp, -1, 0, NULL);
Sleep(100);
}
PER_IO_BUFFER* pBuffer = m_pFreeBuffersList ;
while(pBuffer)
{
m_pFreeBuffersList = pBuffer->pNext ;
HeapFree(GetProcessHeap(),0,pBuffer);
pBuffer = m_pFreeBuffersList ;
}
m_dwBuffersCount = 0;
PER_HANDLE_DATA* pContext = m_pFreeContextsList ;
while(pContext)
{
m_pFreeContextsList = pContext->pNext ;
HeapFree(GetProcessHeap(),0,pContext);
pContext = m_pFreeContextsList ;
}
m_dwContextsCount = 0 ;
}
void Ciocp::ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
LPSOCKADDR lpLocalAddr,lpRemoteAddr;
int nLocalAddr,nRemoteAddr;
DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ;
PER_HANDLE_DATA* pContext1 = AllocContext(pBuffer->sClient);
m_lpfnGetAcceptExSockAddrs(pBuffer->lpBuffer,pBuffer->dwBufferSize- 2*dwAddrSize,dwAddrSize,dwAddrSize,&lpLocalAddr,&nLocalAddr,&lpRemoteAddr,&nRemoteAddr);
memcpy(& (pContext1->saddr),lpRemoteAddr,nRemoteAddr);
SetKeepAlive(pBuffer->sClient);
AddSocketToIocp(pBuffer->sClient,pContext1);
NotifyMsg(pContext1,pBuffer,NOTIFY_MSG_ACCEPT);
PostRead(pContext1);
PostAccept();
ReleaseIoBuffer(pBuffer);
}
BOOL Ciocp::SetKeepAlive(SOCKET s)
{
BOOL bKeepAlive = TRUE;
int nRet = ::setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeepAlive, sizeof(bKeepAlive));
if (nRet == SOCKET_ERROR)
{
return false ;
}else
{
tcp_keepalive alive_in = {0};
tcp_keepalive alive_out = {0};
alive_in.keepalivetime = 5000;
alive_in.keepaliveinterval = 1000;
alive_in.onoff = TRUE ;
unsigned long ulBytesReturn = 0 ;
nRet = WSAIoctl(s,SIO_KEEPALIVE_VALS,&alive_in,sizeof(alive_in),&alive_out,sizeof(alive_out),&ulBytesReturn,NULL,NULL);
if (nRet == SOCKET_ERROR)
{
return FALSE ;
}
}
return TRUE ;
}
void Ciocp::GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite)
{
*pulWrite = m_ulWriteBytes ;
*pulRead = m_ulReadBytes ;
}
void Ciocp::Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize)
{
WaitForSingleObject(pContext->m_hWriteComplete,INFINITE);
PostWrite(pContext,lpBuffer,dwSize);
}
void Ciocp::ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
if (dwTrans == 0)
{
NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);
if (pContext->bConnect) // is connect
{
PostConnect();
}else
{
}
ReleaseContext(pContext);
}
else
{
SetEvent(pContext->m_hWriteComplete);
}
ReleaseIoBuffer(pBuffer);
}
iocp-socket 服务(借鉴别人的,根据自己的需要改的)未完待续
标签:
原文地址:http://www.cnblogs.com/M4ster/p/my_socket_iocp.html