码迷,mamicode.com
首页 > 其他好文 > 详细

基于libevent的tcp拆包分包库

时间:2017-08-26 17:03:10      阅读:181      评论:0      收藏:0      [点我收藏+]

标签:let   dex   line   getc   stream   长度   ignore   libev   ndis   

         TCP/IP协议虽然方便,但是由于是基于流的传输(UDP是基于数据报的传输),无论什么项目,总少不了解决拆包分包问题。

 

         以前的项目总是每个程序员自己写一套拆包分包逻辑,实现的方法与稳定性都不太一致。终于有了做基线的机会,自己写了一个基于libevent的拆包分包库。

 

         本文档黏贴一些核心的内容。

         //回调接口

         

class ITcpPacketNotify
{
public:
virtual void OnConnected(int fd) = 0;
virtual void OnDisConnected(int fd) = 0;
virtual void OnTimeOutError(int fd) = 0;
virtual void OnBuffOverFlow(int fd) = 0;
//提取包,需要业务层返回解析出包的长度,或者舍弃一些不合格的包,成功解析出包返回true
virtual bool OnAnalyzePacket(int fd,const char* buff,int bufflen,int& packetlen,int &ignore) = 0;
//业务层处理包回调,如果需要返回包,可以直接在respond参数和respondlen参数返回,长度不得超过40960
virtual void OnPacketArrived(int fd,const char* packet,int packetlen,char* respond,int& respondlen) = 0;
};

      提供了两种错误情况的通知,跟别的库不太一样的地方是,需要业务层实现拆包逻辑,毕竟每个项目的协议不一样。然后就会收到packet的通知。

 

//提取包,需要业务层返回解析出包的长度,或者舍弃一些不合格的包,成功解析出包返回true
//1B 1B 2B 4B NB 2B 2B
//帧头 命令字 帧序号 帧长度 帧数据 校验字 帧尾
// HEAD CMD FRAME_SEQ DATA_LEN DATA CRC TAIL

virtual bool OnAnalyzePacket(int fd, const char* buff, int bufflen,int& packetlen, int &ignore)
{
if (bufflen<=12)
{
return false;
}
if (buff[0]!=(char)‘!‘)
{
for (int i=1;i<bufflen;i++)
{
if (buff[i] == (char)‘!‘)
{
ignore = i;
break;
}
}
return false;
}

//数据长度
int length = 0;
memcpy((void*)&length, (void*)&buff[4], 4);
length = ntohl(length);
if (bufflen<length+12)
{
return false;
}

packetlen = length + 12;
return true;
}

      

上面是某种协议的拆包例子。

 

typedef struct _TcpPacketConfig
{
int _port; //服务器的端口号
short _workNum; //工作的线程数目
unsigned int _connNum; //每个工作线程的连接数
int _readTimeOut; //读取的超时时间
int _writeTimeOut; //写入的超时时间

_TcpPacketConfig()
{
_connNum = 100;
_workNum = 5;
_readTimeOut = 120;
_writeTimeOut = 120;
_port = 8000;
}

} TcpPacketConfig;

class ITcpPacketManager
{
public:
virtual bool Start(_TcpPacketConfig & config, ITcpPacketNotify* notify) = 0;
virtual void Stop() = 0;
virtual bool SendPacket(int fd,const char* packet,int packetlen) = 0;
};

TCPPACKET_API ITcpPacketManager* CreateTcpPacketManager();

TCPPACKET_API void DestroyTcpPacketManager(ITcpPacketManager* manager);

对外的接口方法。

 

bool CTcpPacketImp::Start(_TcpPacketConfig & config, ITcpPacketNotify* notify)
{
return m_libEvent.StartServer(config._port, config._workNum, config._connNum, config._readTimeOut, config._writeTimeOut,notify);
}

void CTcpPacketImp::Stop()
{
return m_libEvent.StopServer();
}

bool CTcpPacketImp::SendPacket(int fd, const char* packet, int packetlen)
{
return m_libEvent.SendPacket(fd,packet,packetlen);
}

转移到m_libEvent实现。

 

最核心的功能代码如下。

一些数据定义:

#include <event2/bufferevent.h>
#include <event2/bufferevent_compat.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/buffer_compat.h>
#include <event2/http_struct.h>
#include <event2/bufferevent.h>
#include <event2/thread.h>

 

struct _Conn;
struct _Worker;

//服务器属性封装对象
struct _Server
{
bool bStart;
short nPort;
short workernum;
unsigned int connnum;
volatile int nCurrentWorker;
int read_timeout;
int write_timeout;
struct evconnlistener *pListener;
struct event_base *pBase;
HANDLE hThread;
_Worker *pWorker;
};
//连接对象列表
struct _ConnList
{
_ConnList()
{
head=NULL;
tail=NULL;
plistConn=NULL;
}
_Conn *head;
_Conn *tail;
_Conn *plistConn;
};
//连接对象
struct _Conn
{
_Conn()
{
fd=NULL;
bufev=NULL;
index=-1;
in_buf_len=0;
out_buf_len=0;
owner=NULL;
next=NULL;
in_buf=new char[emMaxBuffLen];
out_buf=new char[emMaxBuffLen];
}
~_Conn()
{
delete[]in_buf;
delete[]out_buf;
bufferevent_free(bufev);
}
struct bufferevent *bufev;
evutil_socket_t fd;
int index;
char *in_buf;
int in_buf_len;
char *out_buf;
int out_buf_len;
_Worker *owner;
_Conn *next;
};
//工作线程封装对象.
struct _Worker
{
_Worker()
{
pWokerbase=NULL;
hThread=INVALID_HANDLE_VALUE;
pListConn=NULL;
}
struct event_base *pWokerbase;
HANDLE hThread;
_ConnList *pListConn;
inline _Conn* GetFreeConn()
{
_Conn*pItem=NULL;
if(pListConn->head!=pListConn->tail)
{
pItem=pListConn->head;
pListConn->head=pListConn->head->next;
}
return pItem;
}
inline void PutFreeConn(_Conn *pItem)
{
pListConn->tail->next=pItem;
pListConn->tail=pItem;
}
};

typedef struct _Server Server;
typedef struct _Worker Worker;
typedef struct _Conn Conn;
typedef struct _ConnList ConnList;

头文件:

class CLibEvent
{
public:
CLibEvent(void);
~CLibEvent(void);
private:
//当前服务器对象
Server m_Server;
public:
bool StartServer(int port, short workernum, unsigned int connnum, int read_timeout, int write_timeout,ITcpPacketNotify* notify);
void StopServer();
bool SendPacket(int fd, const char* packet, int packetlen);
private:
static void DoAccept(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data);
static void DoError(struct bufferevent *bev, short error, void *ctx);
static void CloseConn(Conn *pConn);
static void DoRead(struct bufferevent *bev, void *ctx);
static DWORD WINAPI ThreadServer(LPVOID lPVOID);
static DWORD WINAPI ThreadWorkers(LPVOID lPVOID);

static ITcpPacketNotify * m_notify;
};

cpp:

#include "StdAfx.h"
#include "LibEvent.h"

#include <string>
#include <iostream>
using namespace std;

#include <assert.h>
#include <signal.h>

#include <WinSock2.h>


CLibEvent::CLibEvent(void)
{
ZeroMemory(&m_Server,sizeof(m_Server));
WSADATA WSAData;
WSAStartup(0x0201, &WSAData);
}

CLibEvent::~CLibEvent(void)
{
WSACleanup();
}

bool CLibEvent::StartServer(int port, short workernum, unsigned int connnum, int read_timeout, int write_timeout,ITcpPacketNotify* notify)
{
m_notify = notify;
m_Server.bStart=false;
m_Server.nCurrentWorker=0;
m_Server.nPort=port;
m_Server.workernum=workernum;
m_Server.connnum=connnum;
m_Server.read_timeout=read_timeout;
m_Server.write_timeout=write_timeout;
evthread_use_windows_threads();
m_Server.pBase=event_base_new();
if (m_Server.pBase==NULL)
{
return false;
}
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(m_Server.nPort);
m_Server.pListener=evconnlistener_new_bind(m_Server.pBase,DoAccept,(void*)&m_Server,LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,-1,(struct sockaddr*)&sin,sizeof(sin));
if (m_Server.pListener==NULL)
{
return false;
}

m_Server.pWorker=new Worker[workernum];
for (int i=0;i<workernum;i++)
{
m_Server.pWorker[i].pWokerbase=event_base_new();
if (m_Server.pWorker[i].pWokerbase== NULL)
{
delete []m_Server.pWorker;
return false;
}
//初始化连接对象
{
m_Server.pWorker[i].pListConn=new ConnList();
if (m_Server.pWorker[i].pListConn==NULL)
{
return false;
}
m_Server.pWorker[i].pListConn->plistConn=new Conn[m_Server.connnum+1];
m_Server.pWorker[i].pListConn->head=&m_Server.pWorker[i].pListConn->plistConn[0];
m_Server.pWorker[i].pListConn->tail=&m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum];
for (int j=0; j<m_Server.connnum; j++) {
m_Server.pWorker[i].pListConn->plistConn[j].index=j;
m_Server.pWorker[i].pListConn->plistConn[j].next=&m_Server.pWorker[i].pListConn->plistConn[j+1];
}
m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].index=m_Server.connnum;
m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].next=NULL;
//设置当前事件
Conn *p=m_Server.pWorker[i].pListConn->head;
while (p!=NULL)
{
p->bufev=bufferevent_socket_new(m_Server.pWorker[i].pWokerbase,-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
if (p->bufev==NULL)
{
return false;
}
bufferevent_setcb(p->bufev, DoRead, NULL, DoError, p);
bufferevent_setwatermark(p->bufev, EV_READ, 0, 0); //使用默认的高低水位
bufferevent_enable(p->bufev, EV_READ|EV_WRITE);
struct timeval delayWriteTimeout;
delayWriteTimeout.tv_sec=m_Server.write_timeout;
delayWriteTimeout.tv_usec=0;
struct timeval delayReadTimeout;
delayReadTimeout.tv_sec=m_Server.read_timeout;
delayReadTimeout.tv_usec=0;
//bufferevent_set_timeouts(p->bufev,&delayReadTimeout,&delayWriteTimeout);
p->owner=&m_Server.pWorker[i];
p=p->next;
}
}
m_Server.pWorker[i].hThread=CreateThread(NULL,0,ThreadWorkers,&m_Server.pWorker[i],0,NULL);
}
m_Server.hThread=CreateThread(NULL,0,ThreadServer,&m_Server,0,NULL);
if (m_Server.hThread==NULL)
{
return false;
}
m_Server.bStart=true;
return true;
}

void CLibEvent::StopServer()
{
if (m_Server.bStart)
{
struct timeval delay = { 2, 0 };
event_base_loopexit(m_Server.pBase, &delay);
WaitForSingleObject(m_Server.hThread,INFINITE);
if (m_Server.pWorker)
{
for (int i=0;i<m_Server.workernum;i++)
{
event_base_loopexit(m_Server.pWorker[i].pWokerbase, &delay);
WaitForSingleObject(m_Server.pWorker[i].hThread,INFINITE);
}
for (int i=0;i<m_Server.workernum;i++)
{
if (m_Server.pWorker[i].pListConn)
{
delete []m_Server.pWorker[i].pListConn->plistConn;
delete m_Server.pWorker[i].pListConn;
m_Server.pWorker[i].pListConn=NULL;
}
event_base_free(m_Server.pWorker[i].pWokerbase);
}
delete[]m_Server.pWorker;
m_Server.pWorker=NULL;
}
evconnlistener_free(m_Server.pListener);
event_base_free(m_Server.pBase);
}
m_Server.bStart=false;
}

void CLibEvent::DoRead(struct bufferevent *bev, void *ctx)
{
struct evbuffer * input=bufferevent_get_input(bev);
if (evbuffer_get_length(input))
{
Conn *c = (Conn*) ctx;
while (evbuffer_get_length(input))
{
//超过emMaxBuffLen还没有被消费掉,无能为力了。
if (c->in_buf_len >= emMaxBuffLen)
{
m_notify->OnBuffOverFlow(c->fd);
CloseConn(c);
return;
}

//拷贝缓冲池的内存到Conn,最大缓冲不超过emMaxBuffLen
c->in_buf_len += evbuffer_remove(input, c->in_buf + c->in_buf_len, emMaxBuffLen - c->in_buf_len);
//抛给业务层去解析包
while (true)
{
int packlen = 0, ignore = 0;
bool bRet = m_notify->OnAnalyzePacket(c->fd, c->in_buf,c->in_buf_len,packlen, ignore);
if (!bRet) //可能要舍弃一些脏数据
{
if (ignore > 0)
{
c->in_buf_len -= ignore; //缓冲长度变少
memmove(c->in_buf, c->in_buf + ignore, c->in_buf_len);
}
else
{
//解析包失败了,往往是长度不够,跳出此循环继续读缓冲数据
break;
}
}
else
{
if (packlen>c->in_buf_len)
{
//用户解析的时候未考虑长度
break;
}
//解析成功,通知业务层处理
m_notify->OnPacketArrived(c->fd, c->in_buf, packlen,c->out_buf,c->out_buf_len);
if (c->out_buf_len !=0)
{
//回复报文
struct evbuffer * output = bufferevent_get_output(bev);
evbuffer_add(output, c->out_buf, c->out_buf_len);
//移除数据
c->out_buf_len = 0;
}
//移除这个包文
c->in_buf_len -= packlen; //缓冲长度变少
memmove(c->in_buf, c->in_buf + packlen, c->in_buf_len);
}
}

}

}
}

void CLibEvent::CloseConn(Conn *pConn)
{
pConn->in_buf_len = 0;
m_notify->OnDisConnected(pConn->fd);
bufferevent_disable(pConn->bufev, EV_READ | EV_WRITE);
evutil_closesocket(pConn->fd);
pConn->owner->PutFreeConn(pConn);
}

void CLibEvent::DoError(struct bufferevent *bev, short error, void *ctx)
{
Conn *c=(Conn*)ctx;
if (error&EVBUFFER_TIMEOUT)
{
m_notify->OnTimeOutError(c->fd);
}else if (error&EVBUFFER_ERROR)
{
m_notify->OnBuffOverFlow(c->fd);
}
CloseConn(c);
}

void CLibEvent::DoAccept(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data)
{
//此处为监听线程的event.不做处理.
Server *pServer = (Server *)user_data;
//主线程处做任务分发.
int nCurrent=pServer->nCurrentWorker++%pServer->workernum;
//当前线程所在ID号
Worker &pWorker=pServer->pWorker[nCurrent];
//通知线程开始读取数据,用于分配哪一个线程来处理此处的event事件
Conn *pConn=pWorker.GetFreeConn();
if (pConn==NULL)
{
return;
}
pConn->fd=fd;
evutil_make_socket_nonblocking(pConn->fd);
bufferevent_setfd(pConn->bufev, pConn->fd);
//转发发送事件
m_notify->OnConnected(pConn->fd);
bufferevent_enable(pConn->bufev, EV_READ | EV_WRITE);
}

DWORD WINAPI CLibEvent::ThreadServer(LPVOID lPVOID)
{
Server * pServer=reinterpret_cast<Server *>(lPVOID);
if (pServer==NULL)
{
return -1;
}
event_base_dispatch(pServer->pBase);
return GetCurrentThreadId();
}

DWORD WINAPI CLibEvent::ThreadWorkers(LPVOID lPVOID)
{
Worker *pWorker=reinterpret_cast<Worker *>(lPVOID);
if (pWorker==NULL)
{
return -1;
}
event_base_dispatch(pWorker->pWokerbase);
return GetCurrentThreadId();
}

bool CLibEvent::SendPacket(int fd, const char* packet, int packetlen)
{
//这里可能需要优化,但是有些文档说底层会自动分帧发送
int nRet = send(fd, packet, packetlen, 0);
if (nRet<packetlen)
{
return false;
}

return true;
}

ITcpPacketNotify * CLibEvent::m_notify=NULL;

基于libevent的tcp拆包分包库

标签:let   dex   line   getc   stream   长度   ignore   libev   ndis   

原文地址:http://www.cnblogs.com/xuhuajie/p/7435540.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!