标签:ring reads 脚本 pushd 用户名 信息 rem 客户端连接 保存
使用CBrother脚本做TCP服务器与C++客户端通信
工作中总是会遇到一些对于服务器压力不是特别大,但是代码量比较多,用C++写起来很不方便。对于这种需求,我选择用CBrother脚本做服务器,之所以不选择Python是因为python的语法我实在是适应不了,再来CBrother的网络框架也是用C++封装的异步IO,性能还是很有保证的。
废话不多说,先来看下服务器代码,我这里只是记录一个例子,不是全部代码,方便后面做项目的时候直接来自己博客复制代码修改。
1 import CBSocket.code //加载Socket扩展 2 3 var g_tcpModule = null; //全局保存TCP模块对象 4 5 const MSG_TYPE_USER = 1; //客户端发来的消息 6 const MSG_TYPE_CLOSE = 2; //socket断线的消息 7 8 const LOGIC_MSG_LOGIN = 1; //登陆消息全局定义 9 const LOGIC_MSG_GETID = 2; //获取ID消息全局定义 10 11 function main(a) //入口函数,cbrother从这里开始执行 12 { 13 var thread = new Thread(); //启动一个数据管理线程,串行处理全局数据,可以根据不同业务启动多个 14 thread.setThreadAction(new ThreadAction()); 15 thread.start(); 16 17 g_tcpModule = new TcpModule(); //启动一个TCP模块 18 var tcpAction = new TcpAction(); 19 tcpAction.thread = thread; 20 g_tcpModule.setTcpAction(tcpAction); //设置TCP的处理类为TcpAction 21 g_tcpModule.addListenPort(6061,"0.0.0.0"); //监听6061端口 22 g_tcpModule.start(); 23 24 print "tcpServer start!"; 25 26 while(1) 27 { 28 Sleep(1000); 29 } 30 }
TcpAction主要处理tcpmodule的消息回调
1 class SocketBuf //这个类会给每个socket创建一个,用来拼包,因为tcp在传输过程中并不保证每次收到都是整包数据 2 { 3 var _byteArray = new ByteArray(1024 * 10); //每个socket预留10K的缓冲 4 5 function SocketBuf() 6 { 7 _byteArray.setLittleEndian(true); //c++编码为低位编址(LE) 如果要高位编码c++里面可以htonl 8 } 9 10 function PushData(bytes,len) 11 { 12 print "pushdata " + len; 13 if(!_byteArray.writeBytes(bytes,len)) 14 { 15 print "socket buf is full!"; 16 return false; 17 } 18 19 return true; 20 } 21 22 function CheckPack() 23 { 24 print "begin CheckPack!"; 25 var writePos = _byteArray.getWritePos(); 26 print "checkpack " + writePos; 27 if(writePos < 4) //前4个字节表示包的长度 28 { 29 print "CheckPack null < 4"; 30 return null; 31 } 32 33 var msglen = _byteArray.readInt(); 34 print "checkpack " + msglen; 35 if(writePos < msglen + 4) //缓存里的数据还不够一包数据,继续等待数据 36 { 37 print "CheckPack null writePos < msglen + 4"; 38 _byteArray.setReadPos(0); 39 return null; 40 } 41 42 //够一包数据了,取出数据,到另一个线程里去处理 43 var newBytes = _byteArray.readBytes(msglen); 44 newBytes.setLittleEndian(true); 45 46 var readPos = _byteArray.getReadPos(); 47 48 _byteArray.copy(_byteArray,0,readPos,writePos - readPos); 49 _byteArray.setReadPos(0); 50 _byteArray.setWritePos(writePos - readPos); 51 print "writePos:" + writePos; 52 print "readPos:" + readPos; 53 //XORCode(newBytes); //异或解密一下,这个为了安全性考虑。我在另一篇博客里专门写这个函数与C++加密函数的对应关系 54 return newBytes; 55 } 56 } 57 58 class TcpAction 59 { 60 var thread; //这个是逻辑线程,这个例子里我只启动一个逻辑线程 61 var sockMap = new Map(); //保存每个socket的消息缓冲,拼包用 62 var lock = new Lock(); //sockMap的锁 63 64 function OnAccept(sock) 65 { 66 print "accept " + sock + " " + g_tcpModule.getRemoteIP(sock); 67 68 //监听到客户端连接,管理起来 69 lock.lock(); 70 var socketbuf = new SocketBuf(); 71 sockMap.add(sock,socketbuf); 72 lock.unlock(); 73 } 74 75 function OnClose(sock) 76 { 77 print "onclose " + sock; 78 79 //断线了,移除掉,并通知逻辑线程 80 lock.lock(); 81 sockMap.remove(sock); 82 lock.unlock(); 83 84 var newmsg = new ThreadMsg(sock,null); 85 newmsg.type = MSG_TYPE_CLOSE; 86 thread.addMsg(newmsg); 87 } 88 89 function OnRecv(sock,byteArray,len) 90 { 91 print "onrecv " + sock + " len:" + len; 92 93 //收到数据获取socket缓冲 94 lock.lock(); 95 var socketbuf = sockMap.get(sock); 96 lock.unlock(); 97 98 if(socketbuf == null) 99 { 100 return; //应该是被关掉了 101 } 102 103 if(!socketbuf.PushData(byteArray,len)) //数据压进去 104 { 105 g_tcpModule.closeSocket(sock); 106 return;//buf满了都解不开包,说明数据有问题,关了它 107 } 108 109 //把包解出来丢到逻辑线程去处理,循环是因为怕buf里同时又好几包数据 110 var newBytes = socketbuf.CheckPack(); 111 while(newBytes != null) 112 { 113 thread.addMsg(new ThreadMsg(sock,newBytes)); 114 newBytes = socketbuf.CheckPack(); 115 } 116 } 117 }
最后是逻辑线程里,其实上面的代码写好了基本上就不动了,后续添加消息号增加处理都是在逻辑线程里去,所以上面的代码可以封装一下,我这里是为了自己好记忆,所以就这样写了。
1 //这个类是线程消息类,可以理解为一个结构体 2 class ThreadMsg 3 { 4 function ThreadMsg(id,bytes) 5 { 6 socketid = id; 7 byteArray = bytes; 8 type = MSG_TYPE_USER; 9 } 10 11 var socketid; 12 var type; 13 var byteArray; 14 } 15 16 class ThreadAction 17 { 18 var _userMap = new Map(); //用户名索引用户信息 19 var _socketMsp = new Map(); //Socket索引用户信息 20 21 var _funcMap = new Map(); //消息对应的处理函数 22 23 function onInit() 24 { 25 print "thread init" ; 26 27 //线程启动时读取数据库数据 因为篇幅问题不写加载数据库的部分了,固定插入两条数据做模拟数据 28 //LoadDB(); 29 _userMap.add("zhangsan",new UserData("zhangsan","123123",1)); 30 _userMap.add("lisi",new UserData("lisi","321321",2)); 31 32 _funcMap.add(LOGIC_MSG_LOGIN,onLogin); //注册消息号的处理函数 33 _funcMap.add(LOGIC_MSG_GETID,onGetID); 34 } 35 36 function onMsg(msg) 37 { 38 switch(msg.type) 39 { 40 case MSG_TYPE_CLOSE: 41 { 42 //断线消息 43 print "MSG_TYPE_CLOSE"; 44 OnClose(msg.socketid); 45 break; 46 } 47 case MSG_TYPE_USER: 48 { 49 //客户端发来的消息,客户端发来的数据结构都是从struct MsgBase派生,所以前4个字节就是struct MsgBase的msgid 50 var msgid = msg.byteArray.readInt(); 51 var func = _funcMap.get(msgid); 52 print "MSG_TYPE_USER" + msgid; 53 if(func != null) 54 { 55 func.invoke(msg.socketid,msg.byteArray); 56 } 57 break; 58 } 59 } 60 } 61 62 function onEnd() 63 { 64 print "thread end"; 65 } 66 67 function SendData(socketid,byteArray) //发送数据给客户端的函数 68 { 69 //XORCode(byteArray); //异或加密一下 70 var lenByte = new ByteArray(); 71 lenByte.setLittleEndian(true); 72 lenByte.writeInt(byteArray.getWritePos()); 73 74 g_tcpModule.sendData(socketid,lenByte); 75 g_tcpModule.sendData(socketid,byteArray); 76 } 77 78 function OnClose(socketid) 79 { 80 //关闭的时候从socket管理里移除,清理在线状态 81 var userdata = _socketMsp.get(socketid); 82 if(userdata == null) 83 { 84 return; 85 } 86 userdata._IsOnLine = false; 87 userdata._SocketID = 0; 88 _socketMsp.remove(socketid); 89 } 90 91 function onLogin(socketid,byteArray) 92 { 93 print "onLogin"; 94 95 var namebuf = byteArray.readBytes(32); //这个长度要跟C++结构体对应 char name[32]; 96 var name = namebuf.readString(); 97 var pwdbuf = byteArray.readBytes(32); //这个长度要跟C++结构体对应 char pwd[32]; 98 var pwd = pwdbuf.readString(); 99 100 print name; 101 print pwd; 102 103 var userdata = _userMap.get(name); 104 if(userdata == null) 105 { 106 //没有找到用户名,用户名错误 107 var resBytes = new ByteArray(); 108 resBytes.writeInt(LOGIC_MSG_LOGIN); 109 resBytes.writeInt(1); //回复1表示帐号或者密码错误错误 110 SendData(socketid,resBytes); 111 print "name err!"; 112 return; 113 } 114 115 if(pwd != userdata._UserPwd) 116 { 117 var resBytes = new ByteArray(); 118 resBytes.writeInt(LOGIC_MSG_LOGIN); 119 resBytes.writeInt(1); //回复1表示帐号或者密码错误错误 120 SendData(socketid,resBytes); 121 print "pwd err!"; 122 return; 123 } 124 125 if(userdata._IsOnLine) //这个帐号已经登录过了,冲掉 126 { 127 g_tcpModule.closeSocket(userdata._SocketID); 128 OnClose(userdata._SocketID); 129 } 130 131 //登陆成功,添加进socket管理,并至在线状态 132 userdata._IsOnLine = true; 133 userdata._SocketID = socketid; 134 _socketMsp.add(socketid,userdata); 135 136 var resBytes = new ByteArray(); 137 resBytes.setLittleEndian(true); 138 resBytes.writeInt(LOGIC_MSG_LOGIN); 139 resBytes.writeInt(2); //回复2表示登录成功 140 SendData(socketid,resBytes); 141 print "login suc!"; 142 } 143 144 function onGetID(socketid,byteArray) 145 { 146 var userdata = _socketMsp.get(socketid); 147 if(userdata == null) 148 { 149 return; //该socket不在线,不处理 150 } 151 152 var resBytes = new ByteArray(); 153 resBytes.setLittleEndian(true); 154 resBytes.writeInt(LOGIC_MSG_GETID); 155 resBytes.writeInt(userdata._UserID); 156 SendData(socketid,resBytes); 157 } 158 }
服务器代码完成了,再来看看C++客户端代码。
1 enum 2 { 3 LOGIC_MSG_LOGIN = 1, //登陆 4 LOGIC_MSG_GETID = 2, //获取ID 5 }; 6 7 struct MsgBase //消息基类 8 { 9 int msgid; 10 }; 11 12 struct MsgLogin : public MsgBase //登陆消息 13 { 14 char name[32]; 15 char pwd[32]; 16 }; 17 18 struct MsgLoginRet : public MsgBase //登陆返回 19 { 20 int res; 21 }; 22 23 struct MsgGetID : public MsgBase //获取ID消息 24 { 25 }; 26 27 struct MsgGetIDRet : public MsgBase //获取ID返回 28 { 29 int userid; 30 }; 31 32 //接收服务器消息,将数据放到recvBuf里 33 bool RecvData(int sock,char* recvBuf) 34 { 35 int alllen = 0; 36 int len = recv(sock,recvBuf,4,0); //先读4个字节为消息长度 37 if (len <= 0) 38 { 39 return false; //socket被关闭了 40 } 41 42 alllen += len; 43 while (alllen < 4) 44 { 45 len = recv(sock,recvBuf + len,4 - len,0); 46 if (len <= 0) 47 { 48 return false; //socket被关闭了 49 } 50 51 alllen += len; 52 } 53 54 int msglen = *((int*)recvBuf); 55 56 //再将消息内容读入recvBuf 57 alllen = 0; 58 len = recv(sock,recvBuf,msglen,0); 59 if (len <= 0) 60 { 61 return false; //socket被关闭了 62 } 63 64 alllen += len; 65 while (alllen < msglen) 66 { 67 len = recv(sock,recvBuf + len,msglen - len,0); 68 if (len <= 0) 69 { 70 return false; //socket被关闭了 71 } 72 73 alllen += len; 74 } 75 76 return true; 77 } 78 79 //发送数据 80 bool SendData(int sock,MsgBase* pbase,int len) 81 { 82 //XORBuf((char*)pbase,sizeof(len)); //异或加密,下一篇博客专门写这个函数 83 84 send(sock,(const char*)&len,4,0); //发送长度 85 send(sock,(const char*)pbase,len,0); //发送数据 86 return true; 87 } 88 89 int _tmain(int argc, _TCHAR* argv[]) 90 { 91 WORD sockVersion = MAKEWORD(2, 2); 92 WSADATA wsaData; 93 if (WSAStartup(sockVersion, &wsaData) != 0) 94 { 95 return 0; 96 } 97 98 int clinetsocket = socket(PF_INET, SOCK_STREAM, 0); 99 if (clinetsocket == -1) 100 { 101 return 0; 102 } 103 104 struct hostent *hptr = gethostbyname("127.0.0.1"); 105 106 struct sockaddr_in address; 107 address.sin_family = AF_INET; 108 address.sin_addr.s_addr = *(u_long*)hptr->h_addr_list[0]; 109 address.sin_port = htons(6061); 110 111 int result = connect(clinetsocket, (struct sockaddr *)&address, sizeof(address)); 112 if (result == -1) 113 { 114 return NULL; 115 } 116 117 //定义登陆消息结构 118 MsgLogin msg; 119 msg.msgid = LOGIC_MSG_LOGIN; 120 strcpy(msg.name,"lisi"); 121 strcpy(msg.pwd,"321321"); 122 int len = sizeof(msg); 123 SendData(clinetsocket,&msg,len); 124 125 char recvBuf[1024] = {0}; 126 127 while (true) 128 { 129 if(!RecvData(clinetsocket,recvBuf)) 130 { 131 printf("socket close!\n"); //连接断了 132 break; 133 } 134 135 //收到的数据先转为pBase 看前4个字节的msgid 136 MsgBase* pBase = (MsgBase*)recvBuf; 137 138 switch (pBase->msgid) 139 { 140 case LOGIC_MSG_LOGIN: 141 { 142 //登陆返回 143 MsgLoginRet* mlr = (MsgLoginRet*)pBase; 144 if (mlr->res == 1) 145 { 146 printf("login err!\n"); 147 } 148 else 149 { 150 printf("login suc!\n"); 151 152 //请求ID 153 MsgGetID msggetid; 154 msggetid.msgid = LOGIC_MSG_GETID; 155 len = sizeof(msggetid); 156 SendData(clinetsocket,&msggetid,len); 157 } 158 break; 159 } 160 case LOGIC_MSG_GETID: 161 { 162 //请求ID返回 163 MsgGetIDRet* mgir = (MsgGetIDRet*)pBase; 164 printf("userid : %d\n",mgir->userid); 165 break; 166 } 167 } 168 } 169 170 return 0; 171 }
这样客户端和服务器就都完成了,下面再来记录一下C++消息结构序列化后的二进制流。
MsgBase为所有消息的基类,所以从它派生的结构体前4个字节肯定是整形的msgid。在服务端直接readInt读取前4个字节就表示读取了MsgBase里的msgid。
MsgLogin有两个成员变量都为char[32]数组,所以这个结构体的总字节大小是64,除掉前4个字节是msgid意外,readBytes(32)表示读取这个数组,再readString表示获取\0结尾的字符串
MsgLoginRet只有一个成员变量,所以服务器第一个writeInt表示填充基类的msgid,第二个writeInt表示res。
之后的逻辑就都是添加消息号和消息结构做逻辑了,用脚本做服务器编码效率还是非常高的。
标签:ring reads 脚本 pushd 用户名 信息 rem 客户端连接 保存
原文地址:https://www.cnblogs.com/aibiancheng123/p/10269354.html