标签:cto overflow broker 任务 认证 长度 关于 work short
用户自定义协议client/server代码示例
代码参考链接:https://github.com/sogou/workflow
message.h
message.cc
server.cc
client.cc
关于user_defined_protocol
本示例设计一个简单的通信协议,并在协议上构建server和client。server将client发送的消息转换成大写并返回。
协议的格式
协议消息包含一个4字节的head和一个message
body。head是一个网络序的整数,指明body的长度。
请求和响应消息的格式一致。
协议的实现
用户自定义协议,需要提供协议的序列化和反序列化方法,这两个方法都是ProtocolMeessage类的虚函数。
另外,为了使用方便,强烈建议用户实现消息的移动构造和移动赋值(用于std::move())。 在ProtocolMessage.h里,序列化反序列化接口如下:
namespace protocol
{
class ProtocolMessage : public CommMessageOut, public CommMessageIn
{
private:
virtual int encode(struct iovec vectors[], int max);
/* You have to implement one of the ‘append‘ functions, and the first one
* with arguement ‘size_t *size‘ is recommmended. */
virtual int append(const void *buf, size_t *size);
virtual int append(const void *buf, size_t size);
...
};
}
序列化函数encode
反序列化函数append
errno的设置
示例里,消息的序列化反序列化都非常的简单。
头文件message.h里,声明了request和response类:
namespace protocol
{
class TutorialMessage : public ProtocolMessage
{
private:
virtual int encode(struct iovec vectors[], int max);
virtual int append(const void *buf, size_t size);
...
};
using TutorialRequest = TutorialMessage;
using TutorialResponse = TutorialMessage;
}
request和response类,都是同一种类型的消息。直接using就可以。
注意request和response必须可以无参数的被构造,也就是说需要有无参数的构造函数,或完全没有构造函数。
此外,通讯过程中,如果发生重试,response对象会被销毁并重新构造。因此,它最好是一个RAII类。否则处理起来会比较复杂。
message.cc里包含了encode和append的实现:
namespace protocol
{
int TutorialMessage::encode(struct iovec vectors[], int max/*max==8192*/)
{
uint32_t n = htonl(this->body_size);
memcpy(this->head, &n, 4);
vectors[0].iov_base = this->head;
vectors[0].iov_len = 4;
vectors[1].iov_base = this->body;
vectors[1].iov_len = this->body_size;
return 2; /* return the number of vectors used, no more then max. */
}
int TutorialMessage::append(const void *buf, size_t size)
{
if (this->head_received < 4)
{
size_t head_left;
void *p;
p = &this->head[this->head_received];
head_left = 4 - this->head_received;
if (size < 4 - this->head_received)
{
memcpy(p, buf, size);
this->head_received += size;
return 0;
}
memcpy(p, buf, head_left);
size -= head_left;
buf = (const char *)buf + head_left;
p = this->head;
this->body_size = ntohl(*(uint32_t *)p);
if (this->body_size > this->size_limit)
{
errno = EMSGSIZE;
return -1;
}
this->body = (char *)malloc(this->body_size);
if (!this->body)
return -1;
this->body_received = 0;
}
size_t body_left = this->body_size - this->body_received;
if (size > body_left)
{
errno = EBADMSG;
return -1;
}
memcpy(this->body, buf, body_left);
if (size < body_left)
return 0;
return 1;
}
}
encode的实现非常简单,固定使用了两个vector,分别指向head和body。需要注意iov_base指针必须指向消息类的成员。
append需要保证4字节的head接收完整,再读取message body。而且我们并不能保证第一次append一定包含完整的head,所以过程略为繁琐。
append实现了size_limit功能,超过size_limit的会返回EMSGSIZE错误。用户如果不需要限制消息大小,可以忽略size_limit这个域。
由于要求通信协议是一来一回的,所谓的“TCP包”问题不需要考虑,直接当错误消息处理。
现在,有了消息的定义和实现,就可以建立server和client了。
server和client的定义
有了request和response类,我们就可以建立基于这个协议的server和client。前面的示例里介绍过Http协议相关的类型定义:
using WFHttpTask = WFNetworkTask<protocol::HttpRequest,
protocol::HttpResponse>;
using http_callback_t = std::function<void (WFHttpTask *)>;
using WFHttpServer = WFServer<protocol::HttpRequest,
protocol::HttpResponse>;
using http_process_t = std::function<void (WFHttpTask *)>;
同样的,对这个Tutorial协议,数据类型的定义并没有什么区别:
using WFTutorialTask = WFNetworkTask<protocol::TutorialRequest,
protocol::TutorialResponse>;
using tutorial_callback_t = std::function<void (WFTutorialTask *)>;
using WFTutorialServer = WFServer<protocol::TutorialRequest,
protocol::TutorialResponse>;
using tutorial_process_t = std::function<void (WFTutorialTask *)>;
server端
server与普通的http
server没有什么区别。优先IPv6启动,这不影响IPv4的client请求。另外限制请求最多不超过4KB。
代码请自行参考server.cc
client端
client端的逻辑是从标准IO接收用户输入,构造出请求发往server并得到结果。
为了简单,读取标准输入的过程都在callback里完成,因此我们会先发出一条空请求。同样为了安全我们限制server回复包不超4KB。
client端唯一需要了解的就是怎么产生一个自定义协议的client任务,在WFTaskFactory.h有三个接口可以选择:
template<class REQ, class RESP>
class WFNetworkTaskFactory
{
private:
using T = WFNetworkTask<REQ, RESP>;
public:
static T *create_client_task(TransportType type,
const std::string& host,
unsigned short port,
int retry_max,
std::function<void (T *)> callback);
static T *create_client_task(TransportType type,
const std::string& url,
int retry_max,
std::function<void (T *)> callback);
static T *create_client_task(TransportType type,
const URI& uri,
int retry_max,
std::function<void (T *)> callback);
...
};
其中,TransportType指定传输层协议,目前可选的值包括TT_TCP,TT_UDP,TT_SCTP和TT_TCP_SSL。
三个接口的区别不大,在这个示例里暂时不需要URL,用域名和端口来创建任务。
实际的调用代码如下。派生了WFTaskFactory类,但这个派生并非必须的。
using namespace protocol;
class MyFactory : public WFTaskFactory
{
public:
static WFTutorialTask *create_tutorial_task(const std::string& host,
unsigned short port,
int retry_max,
tutorial_callback_t callback)
{
using NTF = WFNetworkTaskFactory<TutorialRequest, TutorialResponse>;
WFTutorialTask *task = NTF::create_client_task(TT_TCP, host, port,
retry_max,
std::move(callback));
task->set_keep_alive(30 * 1000);
return task;
}
};
可以看到用了WFNetworkTaskFactory<TutorialRequest,
TutorialResponse>类来创建client任务。
接下来通过任务的set_keep_alive()接口,让连接在通信完成之后保持30秒,否则,将默认采用短连接。
client的其它代码涉及的知识点在之前的示例里都包含了。请参考client.cc
内置协议的请求是怎么产生的
现在系统中内置了http, redis,mysql,kafka四种协议。可以通过相同的方法产生一个http或redis任务吗?比如:
WFHttpTask *task = WFNetworkTaskFactory<protocol::HttpRequest, protocol::HttpResponse>::create_client_task(...);
需要说明的是,这样产生的http任务,会损失很多的功能,比如,无法根据header来识别是否用持久连接,无法识别重定向等。
同样,如果这样产生一个MySQL任务,可能根本就无法运行起来。因为缺乏登录认证过程。
一个kafka请求可能需要和多台broker有复杂的交互过程,这样创建的请求显然也无法完成这一过程。
可见每一种内置协议消息的产生过程都远远比这个示例复杂。同样,如果用户需要实现一个更多功能的通信协议,还有许多代码要写。
标签:cto overflow broker 任务 认证 长度 关于 work short
原文地址:https://www.cnblogs.com/wujianming-110117/p/14053297.html