标签:style blog io color ar os sp for 数据
服务器发包到客户端 以登录包为例 SendCmd(s2c_login, &ret, sizeof(LoginEnum)); end_stat BaseChannel::SendCmd(int nCmd, void* pData, int nLen) { Protocol Ptl; Ptl.cmd_type = nCmd; Ptl.content = pData; Ptl.size = nLen; void* tmpBuffer = g_SendCmdBuffer; int nBufferLen = 1024 * 100; int nSize = Ptl.to_buffer(tmpBuffer, 1024 * 100);//打包到g_SendCmdBuffer if(-1 == nSize) return send_stat::send_parameter_error; DataPkt Pkt; Pkt.pData = m_pShare->PopPkt(nSize);//获取普通内存池中的一块内存 // pkt.data = new char[n]; Pkt.nSize = nSize; memcpy(Pkt.pData, tmpBuffer, nSize); m_queCmd.push(Pkt);//缓冲指令的地方,m_queCmd算是一个缓冲吧 return SendCmdTry(); } send_stat BaseChannel::SendCmdTry() { if(!m_queCmd.size()) return send_stat::send_succeed; for(;;) { if(m_queCmd.empty()) break; DataPkt pkt = m_queCmd.front(); { send_stat hr = m_pDataLayer->SendData(m_nChannelId, pkt.pData, pkt.nSize); if(hr != send_stat::send_succeed) { //放入等待队列,让GS来发送 m_pShare->PushGcWait(m_nChannelId); return hr; } } m_queCmd.pop(); //delete[] pkt.data; m_pShare->PushPkt(pkt.pData, pkt.nSize);//释放内存池 } return send_stat::send_succeed; } send_stat DataLayer::SendData(int nChannelId, void* pData, int nSize) { #ifdef NET_MUL_PROCESS shareData tmpShareData; tmpShareData.channel_id = nChannelId; tmpShareData.data = pData; tmpShareData.is_data = true; tmpShareData.size = nSize; bool bRet = m_spShareMemInter->pushB(tmpShareData);//现在看这个就应该很好理解了,其实就是从内存中取内存然后放到共享内存队列中 if(bRet) return send_stat::send_succeed; return send_stat::send_buffer_full; #else return m_spTCPServer->send_data(nChannelId, pData, nSize); #endif } 在net主线程 for (;;) { bool bRet = FromMem2Net();//相当于死循环从共享内存中取数据到net中 if(!bRet) { boost::this_thread::interruptible_wait(1); } } bool NetProcSvr::FromMem2Net() { shareData SharePkt; SharePkt.data = m_pSendBuffer; if(m_spShareMemInter->popA(SharePkt) == false)//从共享队列中取包到m_pSendBuffer发送缓冲中 { return false; } if(!SharePkt.is_data) { //主动断开客户端连接 if(SharePkt.size == (int)send_stat::send_disconnected)//现在只是同一个用户登录用户下的两个角色 { return m_spTCPServer->close(SharePkt.channel_id); } } auto eState = m_spTCPServer->send_data(SharePkt.channel_id, SharePkt.data, SharePkt.size); if(eState != send_stat::send_succeed) { if(eState == send_stat::send_disconnected) return true; else if(eState == send_stat::send_buffer_full)//对于libevnet的发送缓冲满了,貌似这个很少吧,不知有多少字节的数据,发送量有多大才会满 { std::string str = boost::lexical_cast<std::string>(SharePkt.channel_id); str += ": send_buffer_full, return ret: "; char buf[10] = {0}; itoa((int)eState, buf, sizeof(eState)); str += buf; MessageBoxA(nullptr, str.c_str(), "send_data警告!", MB_ICONWARNING); std::cout << str << std::endl; } } return true; } //close_channel bool LibEvtServer::close_channel(int channel_id) { auto c2 = m_allChannels[channel_id]; if(!c2->channel || c2->channel->m_is_disconnected) return false; auto c = c2->channel; LibEvtServer::OffChannel2 offc2 = { GetTickCount(), c->m_bev->ev_read.ev_fd, c2}; { std::lock_guard<std::mutex> lock(m_offline_mtx); m_offline_que.push(offc2);//他这个下线只是将其放到下线队列中,等下看看他是何时吧socket关闭的 #ifdef MUL_LIBEVENT_THREAD } {//防止多个libevent thread 线程同时访问下面公共变量 std::lock_guard<std::mutex> lock(m_lts_mtx); #endif m_allChannels[c->m_id] = NULL; m_ids->freeId(c->m_id);//[L]将id归还。 } return true; } //对于这个函数有两个疑问 //1.他到底是如何保证这个套接字有效的 //2.BUFFEREVENT_WRITE到底还是主线程在发送数据,这个和四个子线程什么关系 send_stat LibEvtServer::send_data(int channel_id, void* data, int len) //返回send_stat { #ifndef NET_MUL_ThrSend //多进程模式只有一个线程进此函数 std::lock_guard<std::mutex> lock(m_mtx); #endif if(NULL == data || len <= 0) return send_stat::send_parameter_error; if(++m_send_100count >= 100)//当发送超过100个包是执行一次下线,看来这个踢下线还是在主线程做的 { m_send_100count = 0; free_one_link(); } { //std::lock_guard<std::mutex> lock(m_offline_mtx); auto c2 = m_allChannels[channel_id]; if(!c2 || c2->channel->m_is_disconnected) return send_stat::send_disconnected; auto c = c2->channel; *(int*)m_send_buffer = len; memcpy(m_send_buffer + 4, data, len);//此时他在包当前添了四个字节表示整个包的长度 //auto cur_fd = c->m_bev->ev_write.ev_fd; /** * [说明]:因为是异步读写,bufferevent_write将写入事件丢进工作线程的事件列表中 * 可能存在此主线程在调用bufferevent_free的时候,将套接字关闭(即:主线程立马关闭套接字), * 而下一瞬间工作线程从激活队列中取出此处的写入事件发给客户端时,这个时候,发现此套接字无效, * select的时候发现对无效套接字进行操作,立刻报错!所以需要保证操作的套接字有效 */ if(0 != bufferevent_write(c->m_bev, m_send_buffer, len+4)) //就发送了,客户端的libevent就能收到通知了 { return send_stat::send_buffer_full; } } return send_stat::send_succeed; } bool LibEvtServer::free_one_link() { /** *①让主线程来决定连接,为了保证bufferevent_write的第一个参数绝对有效; *②一次处理一个,保证实时性。 */ if(m_offline_que.size()) { OffChannel2* offc2 = NULL; { std::lock_guard<std::mutex> lock(m_offline_mtx); offc2 = &(m_offline_que.front());//注意需要用成员的引用或地址 } if(offc2->c2) { delete offc2->c2->channel;//bufferevent_free(...) delete offc2->c2; offc2->c2 = nullptr; } if((GetTickCount() - offc2->offtime) > 60*1000)//一分钟让子线程操作完相关套接字,不然无效套接字select报错 { evutil_closesocket(offc2->sockfd); //关闭套接字,相当于clost(socket) std::lock_guard<std::mutex> lock(m_offline_mtx); m_offline_que.pop(); } } return false; } //看看客户端接收到包是如何处理的 如果服务器bufferevent_write的时候客户端的读事件就会被调用,然后他会把它放到流里面,然后从流中读包 bool on_receive_data(int channel_id, void* data, int len) { auto link = g_TcpLinkExs[channel_id];//g_TcpLinkExs保存的是所有的连接,通过channel_id就能够确定是那个客户端连接的 if(!link) return true; //m_event是I_NetEvent类型的,我们这边有好多都是这样做的,别的类保存一个虚基类指针,然后用子类类型进行初始化,到时就拿这个基类指针进行回调 //当时这个过程不是太懂,给我造成了不少麻烦,关键是不太清楚这种用法,看了好久才看出来,而m_event是用NGP初始化的,到时肯定回调NGP里面的相关函数 link->m_event->OnRec(data, len); return true; } bool NGP::OnRec(void* pBuffer, int nSize) { date_pkt pkt; pkt.len = nSize; pkt.date = new char[nSize]; memcpy(pkt.date, pBuffer, nSize); { std::lock_guard<std::mutex> lock(m_PktMutex); m_quePktQueue.push(pkt);//然后这边分配内存,放到这个队列中 } return true; } //然后就是runonce调用不同的function,或者本来就传递一个function过来,看来服务器发送一个包过来也是很麻烦的
标签:style blog io color ar os sp for 数据
原文地址:http://www.cnblogs.com/zzyoucan/p/4098365.html