码迷,mamicode.com
首页 > 其他好文 > 详细

iocp-socket 服务(借鉴别人的,根据自己的需要改的)未完待续

时间:2016-07-12 19:18:57      阅读:190      评论:0      收藏:0      [点我收藏+]

标签:

#pragma once
#include <WinSock2.h>
#include <MSWSock.h>
#include <Windows.h>
#pragma comment(lib,"ws2_32.lib")

#define BUFFER_SIZE (1024*8) // 8KB
#define BUFFER_SIZE_DATA (3*BUFFER_SIZE )

#define NOTIFY_MSG_ACCEPT	0xa1
#define NOTIFY_MSG_CONNECT	0xa2
#define NOTIFY_MSG_DISCONNECT 0xa3
#define NOTIFY_MSG_READ		0xa4
#define NOTIFY_MSG_WRITE	0xa5


struct PER_IO_BUFFER
{
	WSAOVERLAPPED ol;
	SOCKET		sClient ;		//the socket of client use by AcceptEx  
	LPBYTE		lpBuffer;		// the pointer of buffer
	DWORD		dwBufferSize;	// the size of buffer
	DWORD		dwTrans	;		// the size of io-trans
	BYTE		opType;			// opteion type 
#define OP_ACCEPT	6
#define OP_CONNECT	7
#define OP_WRITE	8
#define OP_READ		9
	PER_IO_BUFFER* 	pNext;		// next buffer

};

struct PER_HANDLE_DATA
{
	SOCKET s;
	SOCKADDR_IN saddr;
	BOOL bConnect;
	BYTE readBytes[BUFFER_SIZE_DATA];
	DWORD dwBufferOffSet;
	HANDLE m_hWriteComplete ;
	//DWORD dwBufferSize;
	PER_HANDLE_DATA* pNext;
};

typedef void (__stdcall* PNOTIFYPROC)(PER_HANDLE_DATA*,PER_IO_BUFFER* ,DWORD );
class Ciocp
{
private:
	HANDLE	m_hIocp;			// iocp handle
	SOCKET	m_sListen;			// the socket of listen 
	SOCKET	m_sConnect;			// the socket of connect 
	DWORD	m_dwProt;			// the port of listen 
	DWORD	m_dwMaxConns;		// the max count of connectios
	DWORD	m_dwMaxFreeBuffers;	// the max count of freebuffers
	DWORD	m_dwMaxFreeContexts;// the max count of freecontexts
	DWORD	m_dwInitOp;			// the count of init-op
	ULONGLONG m_ulWriteBytes;	// the total of write-bytes 
	ULONGLONG m_ulReadBytes;	// the total of read-bytes

	DWORD m_dwWorkThreadCount ;	// the count of worker thread
	DWORD m_dwCurWorkThreadCount ;
	CRITICAL_SECTION m_csWorkLock ;
	CRITICAL_SECTION m_csIoLock ;

	CRITICAL_SECTION m_csBuffersListLock;		// the cs-lock of free-buffers-list 
	CRITICAL_SECTION m_csContextsListLock;		// the cs-lock of free-count
	DWORD			m_dwBuffersCount;			// the count of cur buffers
	DWORD			m_dwContextsCount;			// the count of cur contexts

	PER_IO_BUFFER* m_pFreeBuffersList;		// the list of free-buffers
	PER_HANDLE_DATA* m_pFreeContextsList;	// the list of free-contexts

	SOCKADDR_IN		m_siRemoteAddr ;
	bool			m_bStarted ;			// the status of socket server

	LPFN_ACCEPTEX				m_lpfnAcceptEx ;
	LPFN_GETACCEPTEXSOCKADDRS	m_lpfnGetAcceptExSockAddrs;
	LPFN_CONNECTEX				m_lpfnConnectEx ;
	PNOTIFYPROC					m_pNotifyProc;

	BOOL PostAccept();
	BOOL PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize);
	BOOL PostRead(PER_HANDLE_DATA* pContext);
	BOOL PostConnect();
	static unsigned int WINAPI _WorkerThreadProc(LPVOID lpParam);
	PER_IO_BUFFER* AllocBuffer(DWORD dwSize = BUFFER_SIZE);
	PER_HANDLE_DATA* AllocContext(SOCKET s);
	void ReleaseContext(PER_HANDLE_DATA* pContext);
	void ReleaseIoBuffer(PER_IO_BUFFER* pBuffer);
	void CreateWokerThreads();
	void CreateIocp();
	void AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext=0);
	void DecCurWorkCount();
	void HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans);
	void NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg);
	void ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
	BOOL SetKeepAlive(SOCKET s);
	void ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
	void ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
public:
	Ciocp(void);
	~Ciocp(void);
	BOOL Start(PNOTIFYPROC pNotifyProc, DWORD dwPort=8080,DWORD dwMaxConns= 2000,DWORD dwMaxFreeBuffers = 100,DWORD dwMaxFreeContexts =100 ,DWORD dwInitOp = 5 );
	void Shutdown();
	BOOL Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp = "127.0.0.1",DWORD dwPort=443 );
	void GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite);
	void Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize);
	void ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);

};

  

#include "StdAfx.h"
#include "iocp.h"
#include <process.h>
#include <mstcpip.h>


Ciocp::Ciocp(void)
{

	m_hIocp = INVALID_HANDLE_VALUE ;
	m_sListen = INVALID_SOCKET ;
	m_sConnect = INVALID_SOCKET ;
	m_dwInitOp =5;
	m_dwMaxConns = 2000;
	m_dwMaxFreeContexts = 100;
	m_dwMaxFreeBuffers = 100 ;
	m_ulWriteBytes =0;
	m_ulReadBytes = 0;
	// get max worker count
	SYSTEM_INFO sys_info ;
	GetSystemInfo(&sys_info);
	m_dwWorkThreadCount = sys_info.dwNumberOfProcessors *2 ;
	m_dwCurWorkThreadCount = 0;
	InitializeCriticalSection(& m_csBuffersListLock);
	InitializeCriticalSection(& m_csContextsListLock);
	InitializeCriticalSection(& m_csWorkLock);
	InitializeCriticalSection((& m_csIoLock));
	m_dwBuffersCount = 0;
	m_dwContextsCount = 0;

	m_pFreeBuffersList = NULL ;
	m_pFreeContextsList = NULL ;

	m_lpfnAcceptEx = NULL ;
	m_lpfnConnectEx = NULL ;
	m_lpfnGetAcceptExSockAddrs = NULL ;
	m_bStarted = false ;

	m_dwProt = 0;
	WSADATA wsaData ;
	WORD sockVerSion = MAKEWORD(2,2);
	WSAStartup(sockVerSion,&wsaData);
}


Ciocp::~Ciocp(void)
{
	DeleteCriticalSection(& m_csBuffersListLock);
	DeleteCriticalSection(& m_csContextsListLock);
	DeleteCriticalSection(& m_csWorkLock);
	DeleteCriticalSection(& m_csIoLock);
	if (m_sListen != INVALID_SOCKET)
	{
		closesocket(m_sListen);
	}

	if (m_sConnect != INVALID_SOCKET)
	{
		closesocket(m_sConnect);
	}
}

BOOL Ciocp::Start(PNOTIFYPROC pNotifyProc, DWORD dwPort/*=8080*/,DWORD dwMaxConns/*= 2000*/,DWORD dwMaxFreeBuffers /*= 100*/,DWORD dwMaxFreeContexts /*=100 */,DWORD dwInitOp /*= 5 */)
{
	m_pNotifyProc = pNotifyProc ;
	m_dwProt = dwPort;
	m_dwMaxConns = dwMaxConns;
	m_dwMaxFreeBuffers = dwMaxFreeBuffers;
	m_dwMaxFreeContexts = dwMaxFreeContexts;
	m_dwInitOp = dwInitOp ;

	m_sListen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
	SOCKADDR_IN saddr ;
	saddr.sin_family = AF_INET;
	saddr.sin_port = ntohs(m_dwProt);
	saddr.sin_addr.S_un.S_addr = INADDR_ANY ;
	m_bStarted = true ;

	if (bind(m_sListen,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
	{
		m_bStarted = false ;
		return FALSE ;
	}

	listen(m_sListen,m_dwMaxConns);

	CreateIocp();

	GUID guidAcceptEx = WSAID_ACCEPTEX ;
	DWORD dwBytes ;
	WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidAcceptEx,sizeof(guidAcceptEx),&m_lpfnAcceptEx,sizeof(m_lpfnAcceptEx),&dwBytes,NULL,NULL);

	GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS ;

	WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidGetAcceptExSockAddrs,sizeof(guidGetAcceptExSockAddrs),&m_lpfnGetAcceptExSockAddrs,sizeof(m_lpfnGetAcceptExSockAddrs),&dwBytes,NULL,NULL);

	AddSocketToIocp(m_sListen);
	//create worker
	CreateWokerThreads();
	
	// post accept 
	
	for(int i=0;i < m_dwInitOp;i++)
	{
		
		PostAccept();
	}


	
}

BOOL Ciocp::PostAccept()
{
	// set io type
	PER_IO_BUFFER* pBuffer = NULL ;
	pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE);
	pBuffer->opType = OP_ACCEPT ;

	// post io 
	DWORD dwBytes ;
	DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ;
	pBuffer->sClient = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
// 	BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
// 		pBuffer->lpBuffer,pBuffer->dwBufferSize-dwAddrSize*2 
// 		,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
	BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
		pBuffer->lpBuffer,0 
		,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
	if (!b && WSAGetLastError() != WSA_IO_PENDING)
	{
		return FALSE ;
	}

	return TRUE;


}

BOOL Ciocp::PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize)
{
	PER_IO_BUFFER* pBuffer = AllocBuffer();
	pBuffer->opType = OP_WRITE; 

	// post i/o 
	DWORD dwBytes;  
	DWORD dwFlags = 0;  
	WSABUF buf;  
	buf.buf = (char*)lpBuffer;  
	buf.len = dwSize  ;  
	if(::WSASend(pContext->s, &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)  
	{  
		if(::WSAGetLastError() != WSA_IO_PENDING)  
		{  

			return FALSE;  
		}  
	}  

	return TRUE ;
	
}

BOOL Ciocp::PostRead(PER_HANDLE_DATA* pContext)
{
	PER_IO_BUFFER* pBuffer = AllocBuffer();
	pBuffer->opType = OP_READ; 

	// post i/o 
	DWORD dwBytes;  
	DWORD dwFlags = 0;  
	WSABUF buf;  
	buf.buf = (char*)pBuffer->lpBuffer;  
	buf.len = pBuffer->dwBufferSize  ;  
	if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)  
	{  
		if(::WSAGetLastError() != WSA_IO_PENDING)  
		{  
			
			return FALSE;  
		}  
	}  

	return TRUE ;
}

BOOL Ciocp::PostConnect()
{
	PER_IO_BUFFER* pBuffer = NULL ;

	pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE);

	SOCKADDR_IN saddr ;
	saddr.sin_family = AF_INET;
	saddr.sin_port = htons(0);
	saddr.sin_addr.s_addr = htonl(ADDR_ANY);

	if (m_sConnect != INVALID_SOCKET)
	{
		closesocket(m_sConnect);
		m_sConnect = INVALID_SOCKET ;
	}
	m_sConnect = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);

	if (bind(m_sConnect,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
	{
		m_bStarted = false ;
		return FALSE ;
	}
	
	
	PER_HANDLE_DATA *pContext = AllocContext(m_sConnect);
	pContext->bConnect = TRUE ;
	AddSocketToIocp(m_sConnect,pContext);
	//create worker
	
	if (m_lpfnConnectEx == NULL )
	{
		GUID guidConnectex = WSAID_CONNECTEX ;
		DWORD dwBytes ;
		WSAIoctl(m_sConnect,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidConnectex,sizeof(guidConnectex),&m_lpfnConnectEx,sizeof(m_lpfnConnectEx),&dwBytes,NULL,NULL);
	}


	DWORD dwSend = 0;
	pBuffer->opType = OP_CONNECT ;
	strcpy((LPSTR)pBuffer->lpBuffer,"hello kid");
	pBuffer->dwBufferSize = 5;
	bool b = m_lpfnConnectEx(m_sConnect,(SOCKADDR*)&m_siRemoteAddr,sizeof(m_siRemoteAddr),
		pBuffer->lpBuffer,pBuffer->dwBufferSize,&dwSend,& pBuffer->ol);

	if(!b && ::WSAGetLastError() != WSA_IO_PENDING)  
	{  
		return FALSE;  
	}  

	return TRUE;  
}

PER_IO_BUFFER* Ciocp::AllocBuffer(DWORD dwSize)
{
	OutputDebugString(L"Buffer ++ \r\n ");
	PER_IO_BUFFER* pBuffer = NULL;
	if (dwSize > BUFFER_SIZE)
	{
		return NULL ;
	}
	
	EnterCriticalSection(&m_csBuffersListLock);
	if (m_pFreeBuffersList == NULL )
	{
		// 2) HeapAlloc buffer 
		pBuffer = (PER_IO_BUFFER*) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_IO_BUFFER)+ BUFFER_SIZE );
	}else
	{
		// 1) check pFreeBuffersList
		pBuffer = m_pFreeBuffersList ;
		m_pFreeBuffersList = m_pFreeBuffersList->pNext ;
		pBuffer->pNext = 0;
		m_dwBuffersCount -- ;
	}
	LeaveCriticalSection(&m_csBuffersListLock);
	// 
	if (pBuffer!= NULL )
	{
		pBuffer->dwBufferSize = dwSize ;
		pBuffer->lpBuffer =(LPBYTE) (pBuffer+1 );


	}
	return pBuffer ;
	
}

unsigned int WINAPI Ciocp::_WorkerThreadProc(LPVOID lpParam)
{
	Sleep(1000);
	Ciocp* pThis = (Ciocp*)lpParam ;
	DWORD dwTrans = 0;
	DWORD dwKey = 0;
	LPOVERLAPPED lpol;
	PER_IO_BUFFER* pBuffer = NULL ;
	while (pThis->m_bStarted )
	{
		BOOL bOk = GetQueuedCompletionStatus(pThis->m_hIocp,&dwTrans,&dwKey,&lpol,WSA_INFINITE);
		if (dwTrans == -1 )
		{
			pThis->DecCurWorkCount();
			_endthreadex(0);
			return 0;
		}
		pBuffer = CONTAINING_RECORD(lpol,PER_IO_BUFFER,ol);
		if (!bOk)
		{
			if (pBuffer->opType == OP_CONNECT)
			{
				DWORD dwError = WSAGetLastError();
				if (dwError != ERROR_IO_PENDING)
				{
					
					pThis->PostConnect();
					pThis->ReleaseContext((PER_HANDLE_DATA*)dwKey);
				}else
				{
					OutputDebugString(L"Connect Pending .... ");
				}
			}// (pBuffer->opType == OP_CONNECT)

			pThis->ReleaseIoBuffer(pBuffer);
		}else //(!bOk)
		{
			pThis->HandleIoOp((PER_HANDLE_DATA*)dwKey,pBuffer,dwTrans);
		}
	}
	pThis->DecCurWorkCount();
	return  WSAGetLastError();
}

void Ciocp::CreateWokerThreads()
{
	
	for (int i= 0;i< m_dwWorkThreadCount-m_dwCurWorkThreadCount ;i++)
	{
		unsigned threadid;
		_beginthreadex(NULL,0,_WorkerThreadProc,this,0,&threadid);
		m_dwCurWorkThreadCount ++ ;
	}
}

void Ciocp::CreateIocp()
{
	if (m_hIocp == INVALID_HANDLE_VALUE)
	{
		m_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
	}
}

void Ciocp::AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext/*=0*/)
{
	CreateIoCompletionPort((HANDLE)s,m_hIocp,(DWORD)pContext,0);
}



BOOL Ciocp::Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp /*= "127.0.0.1"*/,DWORD dwPort/*=443 */)
{
	m_pNotifyProc = pNotifyProc ;

	m_siRemoteAddr.sin_family = AF_INET;
	m_siRemoteAddr.sin_port = htons(dwPort);
	m_siRemoteAddr.sin_addr.S_un.S_addr = inet_addr(lpstrIp);
	
	

	//listen(m_sListen,m_dwMaxConns);

	CreateIocp();

	m_bStarted = true ;
	CreateWokerThreads();

	
	// post accept 
	

	return PostConnect();


}

PER_HANDLE_DATA* Ciocp::AllocContext(SOCKET s)
{
	PER_HANDLE_DATA *pContext = NULL ;
	OutputDebugString(L"Context ++ \r\n");
	

	EnterCriticalSection(&m_csContextsListLock);
	if (m_pFreeContextsList== NULL )
	{
		pContext = (PER_HANDLE_DATA*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_HANDLE_DATA));
	}else
	{
		pContext = m_pFreeContextsList ;
		m_pFreeContextsList = pContext->pNext ;
		pContext->pNext = 0;
		m_dwContextsCount -- ;
	}
	LeaveCriticalSection(& m_csContextsListLock);

	if (pContext != NULL)
	{
		pContext->s = s ;
		pContext->m_hWriteComplete = CreateEvent(NULL,true,TRUE,NULL);
	}
	return pContext ;
}

void Ciocp::DecCurWorkCount()
{
	EnterCriticalSection(&m_csWorkLock);
	m_dwCurWorkThreadCount -- ;
	LeaveCriticalSection(&m_csWorkLock);
}



void Ciocp::HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans)
{
	EnterCriticalSection(&m_csIoLock);
	
	switch(pBuffer->opType)
	{
	case OP_ACCEPT:
		//OutputDebugString(L"Accept \r\n");
		ProcessIoAccept(pContext,pBuffer,dwTrans);
		break;
	case OP_CONNECT:
		
		ProcessIoConnect(pContext,pBuffer,dwTrans);

		break;
	case OP_WRITE:
		//OutputDebugString(L"Write \r\n");
		ProcessIoWrite(pContext,pBuffer,dwTrans);
		break;
	case OP_READ:
		//OutputDebugString(L"Read \r\n");
		ProcessIoRead(pContext,pBuffer,dwTrans);
		break;
	default:
		OutputDebugString(L"HandleIoOp Default... \r\n");
		break;
	}
	// release pBuffer 
	//ReleaseIoBuffer(pBuffer);

	LeaveCriticalSection(&m_csIoLock);
}

void Ciocp::ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
	if (dwTrans ==0 ) // read error 
	{
		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);

		if (pContext->bConnect) // is connect 
		{
			
			PostConnect();
			
		}else
		{

		}
		ReleaseContext(pContext);


	}else
	{
		//  read ok notify main thread  
		pBuffer->dwTrans = dwTrans ;
		if (pContext->dwBufferOffSet+pBuffer->dwTrans > BUFFER_SIZE_DATA )
		{
			closesocket(pContext->s);
			if (pContext->bConnect)
			{
				PostConnect();
			}
			return ;
		}
		memcpy(pContext->readBytes+pContext->dwBufferOffSet,pBuffer->lpBuffer,pBuffer->dwTrans);
		pContext->dwBufferOffSet+= pBuffer->dwTrans ;

		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_READ);
		PostRead(pContext);
		
	}

	ReleaseIoBuffer(pBuffer);
	
}

void Ciocp::NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg)
{
	if (m_pNotifyProc == NULL )
	{
		OutputDebugString(L"NotifyMsg m_pNotifyProc is NUll \r\n");
	}
	if(!IsBadCodePtr((FARPROC)m_pNotifyProc))
	{
		m_pNotifyProc(pContext,pBuffer,dwMsg);
	}else
	{
		OutputDebugString(L" m_pNotifyProc is badcodeptr \r\n");
	}
}

void Ciocp::ReleaseContext(PER_HANDLE_DATA* pContext)
{
	OutputDebugString(L"Context -- \r\n");
	EnterCriticalSection(& m_csContextsListLock);
	CloseHandle(pContext->m_hWriteComplete);

	ZeroMemory(pContext,sizeof(PER_HANDLE_DATA));
	if (m_dwContextsCount > m_dwMaxFreeContexts)
	{
		HeapFree(GetProcessHeap(),0,pContext);
		LeaveCriticalSection(& m_csContextsListLock);
		return ;
	}else
	{
		
		pContext->pNext = m_pFreeContextsList;
		m_pFreeContextsList = pContext ;
		m_dwContextsCount ++ ;
		
	}
	LeaveCriticalSection(& m_csContextsListLock);
}

void Ciocp::ReleaseIoBuffer(PER_IO_BUFFER* pBuffer)
{
	OutputDebugString(L"Buffer -- \r\n");
	EnterCriticalSection(& m_csBuffersListLock);
	ZeroMemory(pBuffer,sizeof(PER_IO_BUFFER)+pBuffer->dwBufferSize);
	if (m_dwBuffersCount > m_dwMaxFreeBuffers)
	{
		HeapFree(GetProcessHeap(),0,pBuffer);
		LeaveCriticalSection(& m_csBuffersListLock);
		return ;
	}else
	{
		pBuffer->pNext = m_pFreeBuffersList ;
		m_pFreeBuffersList =pBuffer ;
		m_dwBuffersCount ++ ;
	}
	LeaveCriticalSection(& m_csBuffersListLock);

}

void Ciocp::ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
	if (dwTrans == 0)
	{
		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);
		ReleaseContext(pContext);
	}else
	{
		int nAddrLen = sizeof(SOCKADDR_IN);
		getsockname(pContext->s,(SOCKADDR*)&pContext->saddr,&nAddrLen);
		SetKeepAlive(pContext->s);
		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_CONNECT);
		//OutputDebugString(L"Connect \r\n");
		PostRead(pContext);
	}
	ReleaseIoBuffer(pBuffer);
}

void Ciocp::Shutdown()
{
	m_bStarted = false ;

	if (m_sListen != INVALID_SOCKET )
	{
		closesocket(m_sListen);
		m_sListen = INVALID_SOCKET ;

	}
	if (m_sConnect != INVALID_SOCKET)
	{
		closesocket(m_sConnect);
		m_sConnect = INVALID_SOCKET ;
	}

	while (m_dwCurWorkThreadCount > 0)
	{
		::PostQueuedCompletionStatus(m_hIocp, -1, 0, NULL);  
		Sleep(100);
	}

	PER_IO_BUFFER* pBuffer = m_pFreeBuffersList ;
	while(pBuffer)
	{
		m_pFreeBuffersList = pBuffer->pNext ;
		HeapFree(GetProcessHeap(),0,pBuffer);
		pBuffer = m_pFreeBuffersList ;
	}
	m_dwBuffersCount = 0;
	PER_HANDLE_DATA* pContext = m_pFreeContextsList ;
	while(pContext)
	{
		m_pFreeContextsList = pContext->pNext ;
		HeapFree(GetProcessHeap(),0,pContext);
		pContext = m_pFreeContextsList ;
	}
	m_dwContextsCount = 0 ;
	
	
}

void Ciocp::ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{

	LPSOCKADDR lpLocalAddr,lpRemoteAddr;
	int	nLocalAddr,nRemoteAddr;
	DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ;

	PER_HANDLE_DATA* pContext1 = AllocContext(pBuffer->sClient);

	m_lpfnGetAcceptExSockAddrs(pBuffer->lpBuffer,pBuffer->dwBufferSize- 2*dwAddrSize,dwAddrSize,dwAddrSize,&lpLocalAddr,&nLocalAddr,&lpRemoteAddr,&nRemoteAddr);
	memcpy(& (pContext1->saddr),lpRemoteAddr,nRemoteAddr);


	SetKeepAlive(pBuffer->sClient);

	AddSocketToIocp(pBuffer->sClient,pContext1);

	NotifyMsg(pContext1,pBuffer,NOTIFY_MSG_ACCEPT);

	PostRead(pContext1);

	PostAccept();

	ReleaseIoBuffer(pBuffer);
}

BOOL Ciocp::SetKeepAlive(SOCKET s)
{
	BOOL bKeepAlive = TRUE;  
	int nRet = ::setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeepAlive, sizeof(bKeepAlive));  
	if (nRet == SOCKET_ERROR)
	{
		return false ;
	}else
	{
		tcp_keepalive alive_in = {0};
		tcp_keepalive alive_out = {0};

		alive_in.keepalivetime = 5000;
		alive_in.keepaliveinterval = 1000;
		alive_in.onoff = TRUE ;
		unsigned long ulBytesReturn = 0 ;
		nRet = WSAIoctl(s,SIO_KEEPALIVE_VALS,&alive_in,sizeof(alive_in),&alive_out,sizeof(alive_out),&ulBytesReturn,NULL,NULL);
		if (nRet == SOCKET_ERROR)
		{
			return FALSE ;
		}

	}

	return TRUE ;
}

void Ciocp::GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite)
{
	*pulWrite = m_ulWriteBytes ;
	*pulRead = m_ulReadBytes ;
}

void Ciocp::Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize)
{
	WaitForSingleObject(pContext->m_hWriteComplete,INFINITE);
	PostWrite(pContext,lpBuffer,dwSize);
}

void Ciocp::ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
{
	if (dwTrans == 0)
	{
		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);

		if (pContext->bConnect) // is connect 
		{

			PostConnect();

		}else
		{

		}
		ReleaseContext(pContext);
	}
	else
	{
		SetEvent(pContext->m_hWriteComplete);
	}

	ReleaseIoBuffer(pBuffer);
}

  

 

iocp-socket 服务(借鉴别人的,根据自己的需要改的)未完待续

标签:

原文地址:http://www.cnblogs.com/M4ster/p/my_socket_iocp.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!