在前面的章节中,介绍了网络传输、任务线程池、数据库和集群四个主要功能模块。到现在为止,这些模块都还只是一种资源,没有产生实际的运行效果。对一个具备真实功能的应用来说,需要有一个整合的过程。整合方法很多,这里以典型的客户 -客户通信来举例说明。
1、“客户端” 这个概念被抽象为一个节点类st_clientNode,每个客户端连接对应了该类的一个实例。这个类不但存储了有关该连接的所有背景信息(比如聊天程序中的用户名等),还提供了正确解释数据流的代码实现。由于想分开传输层和应用层的代码,实际例子中,该类被分为基础传输类st_clientNode_baseTrans和应用层类st_clientNodeAppLayer两部分。
2、 对没有在本服务器登录的客户端来说,通过集群的服务器-服务器传输链路实现跨服务器通信。“其他服务器”这个概念被抽象为一个节点类st_cross_svr_node,集群内每个服务器连接对应了该类的一个实例。这个类不但存储了有关该连接的所有背景信息(比如服务器的名字、地址等),还提供了正确解释数据流的代码实现。该类实现了三种集群消息类型。
(1) 新客户端登入广播,用于通知集群内所有服务器,有新客户端登入
(2) 客户端退出广播。
(3) 客户端数据流打包传输
3、在最上层,有一个本服务器进程的管理者,称作st_client_table,用于封装所有的服务功能。这个类在每个服务器进程中仅有一个实例。它主要的工作有:
(1) 提供一个盛放、管理各个客户端节点类(st_clientNodeAppLayer实例)、各个集群服务器节点类(st_cross_svr_node实例)的容器;
(2) 提供一个管理本地客户端哈希表的执行者
(3) 提供一个管理全局客户端哈希表的执行者
(4) 把网络传输、任务线程池、数据库和集群四个主要功能模块的信号、槽全部关联起来
(5) 提供负载均衡建议, 在本服务器满员后,建议客户端到集群内最空闲的服务器进程登录。
这些类的合作关系如下:
客户端哈希的目的是为了迅速通过用户名、套接字找到对应的套接字和客户端对象。他们在st_client_table类中定义。为了在较大规模下,仍然获得较好的效率,使用了STL的 unordered_map类。使用基于树的map、QMap也是可以的,但是访问效率不如哈希快。
//st_client_table 本地客户端Hash 成员 QMutex m_hash_mutex; std::unordered_map<quint32,st_clientNode_baseTrans *> m_hash_uuid2node; std::unordered_map<QObject *,st_clientNode_baseTrans *> m_hash_sock2node; //st_client_table 远程客户端Hash 成员 std::unordered_map<quint32,QString> m_hash_remoteClient2SvrName; QMutex m_mutex_cross_svr_map;
*本地哈希成员存储了从套接字到用户节点的映射。使用套接字可迅速得到节点指针。这样,在收到数据时,可直接定位到用户节点。之所以没有从QTcpSocket派生一个子类,存放从套接字到用户节点的映射,是因为套接字的生存周期控制复杂,客户端频繁的登入、登出,一旦套接字对象删除失效,程序中其他位置使用套接字指针直接获取客户端节点指针的操作就会溢出。
*远程哈希成员存储了从客户端 UUID到集群服务器名称的映射。用于向远程客户端发送数据使用。
为了把本文涉及的所有模块沟通起来,st_client_table类做了很多工作。
首先,网络模块、集群会发出信号,他们由st_client_table类响应并处理或移交。
//连接新用户来到信号,将分配新的用户节点、登记哈希表(对一般的Sock)、广播集群信息 connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_NewClientConnected,this,&st_client_table::on_evt_NewClientConnected,Qt::QueuedConnection); //连接新用户已保护信号,将分配新的用户节点、登记哈希表(对SSL Sock) connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_ClientEncrypted,this,&st_client_table::on_evt_ClientEncrypted,Qt::QueuedConnection); //连接新用户断开信号,将清除哈希表、广播集群信息 connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_ClientDisconnected,this,&st_client_table::on_evt_ClientDisconnected,Qt::QueuedConnection); //连接用户数据到来信号,将直接向各个客户端的处理队列中push ,以待线程池解译客户端消息 connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_Data_recieved,this,&st_client_table::on_evt_Data_recieved,Qt::QueuedConnection); //连接用户数据成功发送信号,目前只用于统计流量 connect (m_pThreadEngine,&ZPNetwork::zp_net_Engine::evt_Data_transferred,this,&st_client_table::on_evt_Data_transferred,Qt::QueuedConnection); //连接新集群节点接入信号,将向该节点发送HELLO包,并交换持有的客户端情况 connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_NewSvrConnected,this,&st_client_table::on_evt_NewSvrConnected,Qt::QueuedConnection); //连接集群节点断开信号,将清除属于该节点的客户端全局哈希 connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_NewSvrDisconnected,this,&st_client_table::on_evt_NewSvrDisconnected,Qt::QueuedConnection); //集群节点数据接收,将直接向各个集群节点的处理队列中push ,以待线程池解译集群消息 connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_recieved,this,&st_client_table::on_evt_RemoteData_recieved,Qt::QueuedConnection); //集群节点数据发送, 只用于流量统计 connect (m_pCluster,&ZP_Cluster::zp_ClusterTerm::evt_RemoteData_transferred,this,&st_client_table::on_evt_RemoteData_transferred,Qt::QueuedConnection);
//this event indicates new client connected. void st_client_table::on_evt_NewClientConnected(QObject * clientHandle) { bool nHashContains = false; st_clientNode_baseTrans * pClientNode = 0; m_hash_mutex.lock(); nHashContains = (m_hash_sock2node.find(clientHandle)!=m_hash_sock2node.end())?true:false; if (false==nHashContains) { st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0); //using queued connection of send and revieve; connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadEngine,&ZPNetwork::zp_net_Engine::SendDataToClient,Qt::QueuedConnection); connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadEngine,&ZPNetwork::zp_net_Engine::KickClients,Qt::QueuedConnection); connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection); m_hash_sock2node[clientHandle] = pnode; nHashContains = true; pClientNode = pnode; } else { pClientNode = m_hash_sock2node[clientHandle]; } m_hash_mutex.unlock(); assert(nHashContains!=0 && pClientNode !=0); }上面的代码中连接了三组信号,第一组把客户端发送数据的信号与网络模块连接起来,如客户端A发出该信号,含有客户端B的标示,将由网络模块响应,并最终发给客户端B。第二组连接了客户端踢出信号,比如客户端A想踢出客户端B,则把B的标示泵出,由网络模块响应并踢出B。第三组连接了消息显示信号。
前述,集群模块只提供了各个集群节点的物理连接,并没有实现具体的功能。因此,作为一个特定的应用,需要在集群模块上实现一个应用层功能。这个应用层功能有两部分组成。
一个是集群的应用层实现类st_cross_svr_node, 由集群节点类 ZP_Cluster::zp_ClusterNode派生,负责信令解译。 为了让集群知道这个类的存在,并使用该类而不是基类作为节点类,在st_client_table类构造函数中注册了本应用层实现类的工厂,这个工厂方法被用于产生st_cross_svr_node类的实例。
//zp_clusterterm.h //... /** * The factory enables user-defined sub-classes inherits from zp_ClusterNode * Using SetNodeFactory , set your own allocate method. *注册工厂的方法/ void SetNodeFactory(std::function< zp_ClusterNode * ( zp_ClusterTerm * /*pTerm*/, QObject * /*psock*/, QObject * /*parent*/) > ); std::function<zp_ClusterNode * ( zp_ClusterTerm * /*pTerm*/, QObject * /*psock*/, QObject * /*parent*/)> m_factory; //zp_clusterterm.cpp zp_ClusterTerm::zp_ClusterTerm(/*...*/ ) { //... m_factory = std::bind(&zp_ClusterTerm::default_factory,this,_1,_2,_3); } /** * @brief The factory enables user-defined sub-classes inherits from zp_ClusterNode * Using SetNodeFactory , set your own allocate method. * @fn zp_ClusterTerm::default_factory the default factory function. just return zp_ClusterTerm * * @param pTerm Term object * @param psock Sock Object * @param parent Parent * @return zp_ClusterNode * 默认工厂产生zp_ClusterNode 类的实例 */ zp_ClusterNode * zp_ClusterTerm::default_factory( zp_ClusterTerm * pTerm, QObject * psock, QObject * parent) { return new zp_ClusterNode(pTerm,psock,parent); } //st_client_table.cpp st_client_table::st_client_table(...) { m_pCluster->SetNodeFactory( std::bind(&st_client_table::cross_svr_node_factory, this, _1,_2,_3) );//绑定工厂回调方法 } ZP_Cluster::zp_ClusterNode * st_client_table::cross_svr_node_factory( ZP_Cluster::zp_ClusterTerm * pTerm, QObject * psock, QObject * parent) { st_cross_svr_node * pNode = new st_cross_svr_node(pTerm,psock,parent); pNode->setClientTable(this); return pNode;//实际产生的是st_cross_svr_node类的实例 }
另一个是集群的应用层消息,如下:
#ifndef ST_CROSS_SVR_MSG_H #define ST_CROSS_SVR_MSG_H namespace ExampleServer{ #pragma pack (push,1) #if defined(__GNUC__) #include <stdint.h> typedef struct tag_example_crosssvr_msg{ struct tag_msgHearder{ __UINT16_TYPE__ Mark; //Always be "0x4567" __UINT16_TYPE__ version; //Structure Version __UINT8_TYPE__ mesageType; __UINT32_TYPE__ messageLen; } header; union union_payload{ __UINT8_TYPE__ data[1]; __UINT32_TYPE__ uuids[1]; } payload; } EXAMPLE_CROSSSVR_MSG; #endif #if defined(_MSC_VER) typedef struct tag_example_crosssvr_msg{ struct tag_msgHearder{ unsigned __int16 Mark; //Always be 0x4567 unsigned __int16 version; //Structure Version unsigned __int8 mesageType; unsigned __int32 messageLen; } header; union union_payload{ unsigned __int8 data[1]; unsigned __int32 uuids[1]; } payload; } EXAMPLE_CROSSSVR_MSG; #endif #pragma pack(pop) } #endif
基于上述的数据结构和处理方法,实现了三个messageType消息类型。
0x01 为 远程客户端登入消息,payload为各个新登入到该服务器客户端的UUID
0x02 为 远程客户端退出消息,payload为各个从该服务器退出的客户端的UUID
0x03 为 客户端载荷消息,封装了从远程客户端发给本地客户端之间的客户端-客户端消息。
无中心的服务器集群通过心跳广播在各个节点上存储了当前所有节点的负荷。当客户端登入时,会首先检查当前服务器负荷是否超过门限,一旦超过,则触发均衡建议。
bool st_client_table::NeedRedirect(quint8 bufAddresses[/*64*/],quint16 * pnPort) { if (m_pCluster->clientNums()<m_nBalanceMax) return false; QString strServerName = m_pCluster->minPayloadServer(bufAddresses,pnPort); if (strServerName==m_pCluster->name()) return false; return true; } bool st_clientNodeAppLayer::LoginClient() { //... stMsg_ClientLoginRsp & reply = new stMsg_ClientLoginRsp (...); //... //Cluster-Balance. if (m_pClientTable->NeedRedirect(reply.Address_Redirect,&reply.port_Redirect)) { reply.DoneCode = 1; // } //... emit evt_SendDataToClient(this->sock(),reply); }
C/S 架构服务器实现方式很多,应用案例成千上万。本范例为了演示基本的知识点,采用的设计思路并不是单纯从性能出发的。基于Qt实现,有助于利用Qt本身的引用计数、跨平台等特性,同时,Qt也是封装的非常棒的库,使得我们可以抛开繁琐的API,直接研究问题本身,为Linux, Windows下的同学提供统一的参考范例。感谢为Qt的进步付出心血的贡献者们,他们的不懈坚持让C++语言拥有了一个完整的跨平台UI框架。
一种基于Qt的可伸缩的全异步C/S架构服务器实现(六) 整合各个模块实现功能,布布扣,bubuko.com
一种基于Qt的可伸缩的全异步C/S架构服务器实现(六) 整合各个模块实现功能
原文地址:http://blog.csdn.net/goldenhawking/article/details/37345809