标签:
本文首先介绍了什么是Apache Thrift,接着介绍了Thrift的安装部署及如何利用Thrift来实现一个简单的RPC应用,并简单的探究了一下Thrift的内部实现原理,最后给出一个基于Thrift的可扩展的分布式RPC调用框架,在中小型项目中是一个常见的SOA实践。
Apache Thrift是Facebook 开发的远程服务调用框架,它采用接口描述语言(IDL)定义并创建服务,支持可扩展的跨语言服务开发,所包含的代码生成引擎可以在多种语言中,如 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk 等创建高效的、无缝的服务,其传输数据采用二进制格式,相对 XML 和 JSON 体积更小,对于高并发、大数据量和多语言的环境更有优势。本文将详细介绍 Thrift 的使用,并简要分析Thrift的底层运行原理,最后给出一个基于Thrift的可扩展分布式RPC框架。
yum -y install automake gcc gcc-c++
wget http://ftp.gnu.org/gnu/bison/bison-2.5.1.tar.gz
tar -zxvf bison-2.5.1.tar.gz
cd bison-2.5.1
./configure --prefix=/usr/local/
make;make install
yum -y install libevent2-devel zlib-devel openssl-devel
wget http://sourceforge.net/projects/boost/files/boost/1.55.0/boost_1_55_0.tar.gz
tar -zxvf boost_1_55_0.tar.gz
cd boost_1_55_0
./bootstrap.sh
./b2 install
Wget http://www.apache.org/dyn/closer.cgi?path=/thrift/0.9.3/thrift-0.9.3.tar.gz
cd thrift-0.9.3
./configure;make;make install
Thrift代码包(位于thrift-0.9.3/lib/cpp/src)有以下几个目录:
concurrency:并发和时钟管理方面的库
processor:Processor相关类
protocol:Protocal相关类
transport:transport相关类
server:server相关类
async:异步rpc相关类
这里介绍一个简单的 Thrift 实现实例,使读者能够快速直观地了解什么是 Thrift 以及如何使用 Thrift 构建服务。
创建一个简单的服务Log。
struct LogInfo { 1: required string name, 2: optional string content, } service LogSender { void SendLog(1:list<LogInfo> loglist); string GetLog(1:string logname); }
其中定义了服务 Log 的两个方法,每个方法包含一个方法名,参数列表和返回类型。每个参数包括参数序号,参数类型以及参数名。 Thrift 是对 IDL(Interface Definition Language) 描述性语言的一种具体实现。因此,以上的服务描述文件使用 IDL 语法编写。使用 Thrift 工具编译 log.thrift,就会生成相应的 LogSender.cpp 文件。该文件包含了在 log.thrift 文件中描述的服务Log的接口定义以及服务调用的底层通信细节,用于构建客户端和服务器端的功能。
调用thrift命令生成代码,命令为thrift --gen <language> <Thrift filename>
[root@localhost log_thrift]# thrift -gen cpp log.thrift
[root@localhost log_thrift]# tree gen-cpp/
gen-cpp/
├── log_constants.cpp
├── log_constants.h
├── LogSender.cpp
├── LogSender.h
├── LogSender_server.skeleton.cpp
├── log_types.cpp
└── log_types.h
每个thrift文件会产生四个文件,分别为:${thrift_name}_constants.h,${thrift_name}_constants.cpp,${thrift_name}_types.h,${thrift_name}_types.cpp,
对于含有service的thrift文件,会额外生成两个文件,分别为:${service_name}.h,${service_name}.cpp
对于含有service的thrift文件,会生成一个可用的server桩:${service_name}_server.skeleton.cpp
#include "gen-cpp/LogSender.h" #include <map> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using boost::shared_ptr; std::map<std::string, std::string> logMap; class LogSenderHandler : virtual public LogSenderIf { public: LogSenderHandler() { // Your initialization goes here } void SendLog(const std::vector<LogInfo> & loglist) { // Your implementation goes here sleep(5); time_t now = time(NULL); printf("SendLog, now = %s\n", ctime(&now)); for (size_t i = 0; i < loglist.size(); ++i) { if (logMap.find(loglist[i].name) == logMap.end()) { printf("name=[%s], content=[%s]\n", loglist[i].name.c_str(), loglist[i].content.c_str()); logMap.insert(std::make_pair(loglist[i].name, loglist[i].content)); } } } void GetLog(std::string& _return, const std::string& logname) { // Your implementation goes here std::map<std::string,std::string>::iterator iter = logMap.find(logname); if (iter != logMap.end()) { _return = iter->second; } else { _return = "Not Found!"; } } }; int main(int argc, char **argv) { int port = 9090; shared_ptr<LogSenderHandler> handler(new LogSenderHandler()); shared_ptr<TProcessor> processor(new LogSenderProcessor(handler)); shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory); server.serve(); return 0; }
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <stdint.h> #include <string> #include "gen-cpp/log_constants.h" #include "gen-cpp/log_types.h" #include "gen-cpp/LogSender.h" #include <thrift/transport/TSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/protocol/TBinaryProtocol.h> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; void send_log(const std::string& strName, const std::string& strContent) { boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 9090)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); LogSenderClient client(protocol); try { transport->open(); vector<LogInfo> logInfos; LogInfo logInfo; logInfo.__set_name(strName); logInfo.__set_content(strContent); logInfos.push_back(logInfo); client.SendLog(logInfos); transport->close(); } catch (TException &tx) { printf("ERROR: %s\n", tx.what()); } } void get_log(const std::string& strName) { boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 9090)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); LogSenderClient client(protocol); try { transport->open(); std::string strResult; client.GetLog(strResult, strName); printf("GetLog: name = %s, log = %s\n", strName.c_str(), strResult.c_str()); transport->close(); } catch (TException &tx) { printf("ERROR: %s\n", tx.what()); } } int main(int argc, char** argv) { send_log("log1", "this is a example1"); get_log("log1"); get_log("log2"); return 0; }
#include "gen-cpp/LogSender.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TNonblockingServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/concurrency/PosixThreadFactory.h> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using namespace ::apache::thrift::concurrency; using boost::shared_ptr; #define THREAD_NUM 5 std::map<std::string, std::string> logMap; class LogSenderHandler : virtual public LogSenderIf { public: LogSenderHandler() { // Your initialization goes here } void SendLog(const std::vector<LogInfo> & loglist) { // Your implementation goes here sleep(5); time_t now = time(NULL); printf("SendLog, now = %s\n", ctime(&now)); for (size_t i = 0; i < loglist.size(); ++i) { if (logMap.find(loglist[i].name) == logMap.end()) { logMap.insert(std::make_pair(loglist[i].name, loglist[i].content)); } } } void GetLog(std::string& _return, const std::string& logname) { // Your implementation goes here std::map<std::string,std::string>::iterator iter = logMap.find(logname); if (iter != logMap.end()) { _return = iter->second; } else { _return = "Not Found!"; } } }; int main(int argc, char **argv) { int port = 9090; shared_ptr<LogSenderHandler> handler(new LogSenderHandler()); shared_ptr<TProcessor> processor(new LogSenderProcessor(handler)); shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(THREAD_NUM); shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory> (new PosixThreadFactory()); threadManager->threadFactory(threadFactory); threadManager->start(); TNonblockingServer server(processor, protocolFactory, port, threadManager); server.serve(); return 0; }
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <stdint.h> #include <string> #include "gen-cpp/log_constants.h" #include "gen-cpp/log_types.h" #include "gen-cpp/LogSender.h" #include <thrift/transport/TSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/protocol/TBinaryProtocol.h> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; void send_log(const std::string& strName, const std::string& strContent) { boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 9090)); //对接nonblockingServer时必须的,对普通server端时用boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TTransport> transport(new TFramedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); LogSenderClient client(protocol); try { transport->open(); vector<LogInfo> logInfos; LogInfo logInfo; logInfo.__set_name(strName); logInfo.__set_content(strContent); logInfos.push_back(logInfo); client.SendLog(logInfos); transport->close(); } catch (TException &tx) { printf("ERROR: %s\n", tx.what()); } } int main(int argc, char** argv) { send_log("log1", "this is a example1"); return 0; }
运行阻塞式服务器,同时启动10个client,可以观察到,由于阻塞服务器sleep(5)模拟每个调用,这里每次调用之间都相差5秒,相当于是串行进行处理的:
运行非阻塞服务器,同时启动10个client,由于使用了非阻塞加线程池(这里线程池大小为5)模式,同样是sleep(5)的模拟处理,这里的处理速度和吞吐量都大大提高。
thrift文件内容可能会随着时间变化的。如果已经存在的消息类型不再符合设计要求,比如,新的设计要在message格式中添加一个额外字段,但你仍想使用以前的thrift文件产生的处理代码。如果想要达到这个目的,需要:
(1)不要修改已存在域的整数编号
(2)新添加的域必须是optional的,以便格式兼容。
比如对于上面例子中的log.thrift
struct LogInfo {
1: required string name,
2: optional string content,
}
content是optional的,需要将它的__isset值设为true,才能序列化并传输,否则会认为字段不存在,不会被序列化。比如client.cpp中的代码,如果我们将content字段__isset设为false,则server将不会收到content:
logInfo.__isset.content=false;
程序运行完了,我们来看一下client.GetLog()函数的内部实现(在LogSender.cpp中)
void LogSenderClient::GetLog(std::string& _return, const std::string& logname) { send_GetLog(logname); recv_GetLog(_return); } void LogSenderClient::send_GetLog(const std::string& logname) { int32_t cseqid = 0; oprot_->writeMessageBegin("GetLog", ::apache::thrift::protocol::T_CALL, cseqid); LogSender_GetLog_pargs args; args.logname = &logname; args.write(oprot_); oprot_->writeMessageEnd(); oprot_->getTransport()->writeEnd(); oprot_->getTransport()->flush(); } void LogSenderClient::recv_GetLog(std::string& _return) { int32_t rseqid = 0; std::string fname; ::apache::thrift::protocol::TMessageType mtype; iprot_->readMessageBegin(fname, mtype, rseqid); if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { ::apache::thrift::TApplicationException x; x.read(iprot_); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); throw x; } if (mtype != ::apache::thrift::protocol::T_REPLY) { iprot_->skip(::apache::thrift::protocol::T_STRUCT); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } if (fname.compare("GetLog") != 0) { iprot_->skip(::apache::thrift::protocol::T_STRUCT); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); } LogSender_GetLog_presult result; result.success = &_return; result.read(iprot_); iprot_->readMessageEnd(); iprot_->getTransport()->readEnd(); if (result.__isset.success) { // _return pointer has now been filled return; } throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "GetLog failed: unknown result"); }
阅读上面的代码,可以看出,RPC函数GetLog()实际上被转化成了两个函数:send_GetLog和recv_GetLog,分别用于发送数据和接收结果。数据是以消息的形式表示的,消息头部是RPC函数名,消息内容是RPC函数的参数。
Thrift实际上是实现了C/S模式,通过代码生成工具将接口定义文件生成服务器端和客户端代码(可以为不同语言),从而实现服务端和客户端跨语言的支持。用户在Thirft描述文件中声明自己的服务,这些服务经过编译后会生成相应语言的代码文件,然后用户实现服务(客户端调用服务,服务端提供服务)。其中protocol(协议层, 定义数据传输格式,可以为二进制或者XML等)和transport(传输层,定义数据传输方式,可以为TCP/IP传输,内存共享或者文件共享等)被用作运行时库。
Thrift 脚本可定义的数据类型包括以下几种类型:
基本类型:
bool:布尔值,true 或 false,对应 Java 的 boolean
byte:8 位有符号整数,对应 Java 的 byte
i16:16 位有符号整数,对应 Java 的 short
i32:32 位有符号整数,对应 Java 的 int
i64:64 位有符号整数,对应 Java 的 long
double:64 位浮点数,对应 Java 的 double
string:未知编码文本或二进制字符串,对应 Java 的 String
结构体类型:
struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean
容器类型:
list:对应 Java 的 ArrayList
set:对应 Java 的 HashSet
map:对应 Java 的 HashMap
异常类型:
exception:对应 Java 的 Exception
服务类型:
service:对应服务的类
Thrift可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本 (text) 和二进制 (binary) 传输协议,为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这需要根据项目/产品中的实际需求。常用协议有以下几种:
TBinaryProtocol 二进制编码格式进行数据传输
TCompactProtocol 高效率的、密集的二进制编码格式进行数据传输
TJSONProtocol 使用 JSON 的数据编码协议进行数据传输
TSimpleJSONProtocol 只提供 JSON 只写的协议,适用于通过脚本语言解析
TDebugProtocol – 使用易懂的可读的文本格式,以便于debug
常用的传输层有以下几种:
TServerTransport 使用阻塞式 I/O 进行传输,是最常见的模式
TFramedTransport 使用非阻塞方式,按块的大小进行传输
若使用 TFramedTransport 传输层,其服务器必须修改为非阻塞的服务类型
TFileTransport – 以文件形式进行传输
TNonblockingTransport 使用非阻塞方式,用于构建异步客户端
TMemoryTransport 将内存用于I/O
TZlibTransport 使用zlib进行压缩,与其他传输方式联合使用。
常见的服务端类型有以下几种:
TSimpleServer 单线程服务器端使用标准的阻塞式 I/O
TThreadPoolServer 多线程服务器端使用标准的阻塞式 I/O
TNonblockingServer 多线程服务器端使用非阻塞式 I/O(需使用TFramedTransport数据传输方式)
Client负责做负载均衡和容灾,一般情况下使用random来选择proxy就可以了。某个proxy连接不上的话,由客户端自动另外选择一个。
Proxy部署可以比较灵活,可以在某一类service前面单独部署proxy,也可以在多个类别的service前面部署proxy,一般根据service被调用的频率或热点情况来调整。
Service的负载均衡可以由proxy来负责,service定时上报自身负载和运行情况,proxy根据一定的策略来进行调度;或proxy也可以采用第三方负载均衡组件来分发对service的调用,比如腾讯的L5等。
与thrift类似的开源RPC框架还有google的protocal buffer,它虽然支持的语言比较少,但效率更高,因而受到越来越多的关注。
由于thrift开源时间很早,经受了时间的验证,因而许多系统更愿意采用thrift,如Hadoop,Cassandra等。
附:thrift与protocal buffer比较
从上面的比较可以看出,thrift胜在“丰富的特性“上,而protocal buffer胜在“文档化”非常好上。在具体实现上,它们非常类似,都是使用唯一整数标记字段域,这就使得增加和删除字段与不会破坏已有的代码。
它们的最大区别是thrift支持完整的client/server RPC框架,而protocal buffer只会产生接口,具体实现,还需要用户做大量工作。
另外,从序列化性能上比较,Protocal Buffer要远远优于thrift,具体可参考:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/?ca=drs-tp4608
Apache thrift - 使用,内部实现及构建一个可扩展的RPC框架
标签:
原文地址:http://www.cnblogs.com/linuxbug/p/5635154.html