#include "StdAfx.h" #include "LibEvent.h"
#include <string> #include <iostream> using namespace std;
#include <assert.h> #include <signal.h>
#include <WinSock2.h>
CLibEvent::CLibEvent(void) { ZeroMemory(&m_Server,sizeof(m_Server)); WSADATA WSAData; WSAStartup(0x0201, &WSAData); }
CLibEvent::~CLibEvent(void) { WSACleanup(); }
bool CLibEvent::StartServer(int port, short workernum, unsigned int connnum, int read_timeout, int write_timeout,ITcpPacketNotify* notify) { m_notify = notify; m_Server.bStart=false; m_Server.nCurrentWorker=0; m_Server.nPort=port; m_Server.workernum=workernum; m_Server.connnum=connnum; m_Server.read_timeout=read_timeout; m_Server.write_timeout=write_timeout; evthread_use_windows_threads(); m_Server.pBase=event_base_new(); if (m_Server.pBase==NULL) { return false; } struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(m_Server.nPort); m_Server.pListener=evconnlistener_new_bind(m_Server.pBase,DoAccept,(void*)&m_Server,LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,-1,(struct sockaddr*)&sin,sizeof(sin)); if (m_Server.pListener==NULL) { return false; } m_Server.pWorker=new Worker[workernum]; for (int i=0;i<workernum;i++) { m_Server.pWorker[i].pWokerbase=event_base_new(); if (m_Server.pWorker[i].pWokerbase== NULL) { delete []m_Server.pWorker; return false; } //初始化连接对象 { m_Server.pWorker[i].pListConn=new ConnList(); if (m_Server.pWorker[i].pListConn==NULL) { return false; } m_Server.pWorker[i].pListConn->plistConn=new Conn[m_Server.connnum+1]; m_Server.pWorker[i].pListConn->head=&m_Server.pWorker[i].pListConn->plistConn[0]; m_Server.pWorker[i].pListConn->tail=&m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum]; for (int j=0; j<m_Server.connnum; j++) { m_Server.pWorker[i].pListConn->plistConn[j].index=j; m_Server.pWorker[i].pListConn->plistConn[j].next=&m_Server.pWorker[i].pListConn->plistConn[j+1]; } m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].index=m_Server.connnum; m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].next=NULL; //设置当前事件 Conn *p=m_Server.pWorker[i].pListConn->head; while (p!=NULL) { p->bufev=bufferevent_socket_new(m_Server.pWorker[i].pWokerbase,-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); if (p->bufev==NULL) { return false; } bufferevent_setcb(p->bufev, DoRead, NULL, DoError, p); bufferevent_setwatermark(p->bufev, EV_READ, 0, 0); //使用默认的高低水位 bufferevent_enable(p->bufev, EV_READ|EV_WRITE); struct timeval delayWriteTimeout; delayWriteTimeout.tv_sec=m_Server.write_timeout; delayWriteTimeout.tv_usec=0; struct timeval delayReadTimeout; delayReadTimeout.tv_sec=m_Server.read_timeout; delayReadTimeout.tv_usec=0; //bufferevent_set_timeouts(p->bufev,&delayReadTimeout,&delayWriteTimeout); p->owner=&m_Server.pWorker[i]; p=p->next; } } m_Server.pWorker[i].hThread=CreateThread(NULL,0,ThreadWorkers,&m_Server.pWorker[i],0,NULL); } m_Server.hThread=CreateThread(NULL,0,ThreadServer,&m_Server,0,NULL); if (m_Server.hThread==NULL) { return false; } m_Server.bStart=true; return true; }
void CLibEvent::StopServer() { if (m_Server.bStart) { struct timeval delay = { 2, 0 }; event_base_loopexit(m_Server.pBase, &delay); WaitForSingleObject(m_Server.hThread,INFINITE); if (m_Server.pWorker) { for (int i=0;i<m_Server.workernum;i++) { event_base_loopexit(m_Server.pWorker[i].pWokerbase, &delay); WaitForSingleObject(m_Server.pWorker[i].hThread,INFINITE); } for (int i=0;i<m_Server.workernum;i++) { if (m_Server.pWorker[i].pListConn) { delete []m_Server.pWorker[i].pListConn->plistConn; delete m_Server.pWorker[i].pListConn; m_Server.pWorker[i].pListConn=NULL; } event_base_free(m_Server.pWorker[i].pWokerbase); } delete[]m_Server.pWorker; m_Server.pWorker=NULL; } evconnlistener_free(m_Server.pListener); event_base_free(m_Server.pBase); } m_Server.bStart=false; }
void CLibEvent::DoRead(struct bufferevent *bev, void *ctx) { struct evbuffer * input=bufferevent_get_input(bev); if (evbuffer_get_length(input)) { Conn *c = (Conn*) ctx; while (evbuffer_get_length(input)) { //超过emMaxBuffLen还没有被消费掉,无能为力了。 if (c->in_buf_len >= emMaxBuffLen) { m_notify->OnBuffOverFlow(c->fd); CloseConn(c); return; }
//拷贝缓冲池的内存到Conn,最大缓冲不超过emMaxBuffLen c->in_buf_len += evbuffer_remove(input, c->in_buf + c->in_buf_len, emMaxBuffLen - c->in_buf_len); //抛给业务层去解析包 while (true) { int packlen = 0, ignore = 0; bool bRet = m_notify->OnAnalyzePacket(c->fd, c->in_buf,c->in_buf_len,packlen, ignore); if (!bRet) //可能要舍弃一些脏数据 { if (ignore > 0) { c->in_buf_len -= ignore; //缓冲长度变少 memmove(c->in_buf, c->in_buf + ignore, c->in_buf_len); } else { //解析包失败了,往往是长度不够,跳出此循环继续读缓冲数据 break; } } else { if (packlen>c->in_buf_len) { //用户解析的时候未考虑长度 break; } //解析成功,通知业务层处理 m_notify->OnPacketArrived(c->fd, c->in_buf, packlen,c->out_buf,c->out_buf_len); if (c->out_buf_len !=0) { //回复报文 struct evbuffer * output = bufferevent_get_output(bev); evbuffer_add(output, c->out_buf, c->out_buf_len); //移除数据 c->out_buf_len = 0; } //移除这个包文 c->in_buf_len -= packlen; //缓冲长度变少 memmove(c->in_buf, c->in_buf + packlen, c->in_buf_len); } }
}
} }
void CLibEvent::CloseConn(Conn *pConn) { pConn->in_buf_len = 0; m_notify->OnDisConnected(pConn->fd); bufferevent_disable(pConn->bufev, EV_READ | EV_WRITE); evutil_closesocket(pConn->fd); pConn->owner->PutFreeConn(pConn); }
void CLibEvent::DoError(struct bufferevent *bev, short error, void *ctx) { Conn *c=(Conn*)ctx; if (error&EVBUFFER_TIMEOUT) { m_notify->OnTimeOutError(c->fd); }else if (error&EVBUFFER_ERROR) { m_notify->OnBuffOverFlow(c->fd); } CloseConn(c); }
void CLibEvent::DoAccept(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data) { //此处为监听线程的event.不做处理. Server *pServer = (Server *)user_data; //主线程处做任务分发. int nCurrent=pServer->nCurrentWorker++%pServer->workernum; //当前线程所在ID号 Worker &pWorker=pServer->pWorker[nCurrent]; //通知线程开始读取数据,用于分配哪一个线程来处理此处的event事件 Conn *pConn=pWorker.GetFreeConn(); if (pConn==NULL) { return; } pConn->fd=fd; evutil_make_socket_nonblocking(pConn->fd); bufferevent_setfd(pConn->bufev, pConn->fd); //转发发送事件 m_notify->OnConnected(pConn->fd); bufferevent_enable(pConn->bufev, EV_READ | EV_WRITE); }
DWORD WINAPI CLibEvent::ThreadServer(LPVOID lPVOID) { Server * pServer=reinterpret_cast<Server *>(lPVOID); if (pServer==NULL) { return -1; } event_base_dispatch(pServer->pBase); return GetCurrentThreadId(); }
DWORD WINAPI CLibEvent::ThreadWorkers(LPVOID lPVOID) { Worker *pWorker=reinterpret_cast<Worker *>(lPVOID); if (pWorker==NULL) { return -1; } event_base_dispatch(pWorker->pWokerbase); return GetCurrentThreadId(); }
bool CLibEvent::SendPacket(int fd, const char* packet, int packetlen) { //这里可能需要优化,但是有些文档说底层会自动分帧发送 int nRet = send(fd, packet, packetlen, 0); if (nRet<packetlen) { return false; }
return true; }
ITcpPacketNotify * CLibEvent::m_notify=NULL;
|