本文主要探讨了消息总线支持Thrift RPC的实现过程。鉴于RabbitMQ官方的Java Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API。然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时为SOA提供基础设施。
Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目。Thrift通过一个中间语言(IDL, 接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(目前支持C++,Java, Python,PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk和OCaml),并由生成的代码负责RPC协议层和传输层的实现。
做这件事情的初衷是RabbitMQ可以用于模拟request/response这样的通信模型,而这个通信模型就是通常C/S以及B/S架构的通信模型。并且因为RPC的流行,其官方java client已经提供了对基于JSON(文本协议)的RPC的实现。而Thrift本身是个RPC框架,提供跨语言、多协议、多传输通信机制的实现。如果能将两者衔接起来,消息总线对RPC的支持无疑更加完善。
Thrift的实现是基于类似TCP/IP的多层协议栈模型。它的特点是对等通信,逻辑分离,分层解耦。如下图:
在协议层,目前Thrift支持众多的协议,这些协议大致分为两类:
public void flush() throws TTransportException {
byte[] data = this.reqMsgStream.toByteArray();
this.reqMsgStream.reset();
byte[] responseData = new byte[0];
try {
responseData = this.client.primitiveRequest(secret, target, data, token, timeout);
} catch (Exception e) {
ExceptionHelper.logException(logger, e, "[flush]");
}
this.respMsgStream = new ByteArrayInputStream(responseData);
}public void testThriftRpc() throws Exception {
TTransport transport = new TAMQPClientTransport(this.client,
"kliwhiduhaiucvarkjajksdbfkjabw",
"emapDemoRpcResponse",
"klasehnfkljashdnflhkjahwlekdjf",
10000);
transport.open();
TProtocol protocol = new TJSONProtocol(transport);
CalcService.Client client = new CalcService.Client(protocol);
int result = client.calcSum();
logger.info(result);
transport.close();
}MessagebusSinglePool singlePool = new MessagebusSinglePool(host, port);;
Messagebus client = singlePool.getResource();
//server code
WrappedRpcServer rpcServer = null;
try {
TProcessor processor = new CalcService.Processor(new CalcServiceImpl());
TProtocolFactory inProtocolFactory = new TJSONProtocol.Factory();
TProtocolFactory outProtocolFactory = new TJSONProtocol.Factory();
rpcServer = client.buildRpcServer("mshdfjbqwejhfgasdfbjqkygaksdfa",
new ThriftMessageHandler(processor, inProtocolFactory, outProtocolFactory));
rpcServer.mainLoop();
} finally {
rpcServer.close();
singlePool.returnResource(client);
singlePool.destroy();
}消息总线提供了一个API:buildRpcServer,它构建出一个封装后的WrappedRpcServer。该WrappedRpcServer封装了上文提到的RabbitMQ java client自带的RpcServer。为什么要封装?主要的原因还是信息隐藏。其实,如果只接入RabbitMQ,不封装是没有问题的。但现在的目的是接入消息总线,而消息总线在RabbitMQjava client之上又封装了一层,屏蔽了一些不必要的设置,而这些设置恰好又是构建这个RpcServer类的实例所必备的参数(比如 queue name,channel等)。
因此这里我们基于组合的方式将RpcServer从WrappedRpcServer类的构造器注入进来,使得WrappedRpcServer成为RpcServer的代理,WrappedRpcServer的构造器访问标识符被设置为private,这是因为我们在消息总线内部构建了RpcServer的实例,然后通过反射机制来构造WrappedRpcServer的实例。看代码:public WrappedRpcServer buildRpcServer(String secret, final IRpcMessageProcessor rpcMsgProcessor) {
Node source = this.getContext().getConfigManager().getNodeView(secret).getCurrentQueue();
try {
RpcServer aServer = new RpcServer(this.getContext().getChannel(), source.getValue()) {
@Override
public byte[] handleCall(QueueingConsumer.Delivery request, AMQP.BasicProperties replyProperties) {
return rpcMsgProcessor.onRpcMessage(request.getBody());
}
};
Constructor<WrappedRpcServer> rpcServerConstructor = WrappedRpcServer.class.getDeclaredConstructor(RpcServer.class);
rpcServerConstructor.setAccessible(true);
WrappedRpcServer wrappedRpcServer = rpcServerConstructor.newInstance(aServer);
rpcServerConstructor.setAccessible(false);
return wrappedRpcServer;public interface IRpcMessageProcessor {
public byte[] onRpcMessage(byte[] in);
}public byte[] onRpcMessage(byte[] inMsg) {
InputStream in = new ByteArrayInputStream(inMsg);
OutputStream out = new ByteArrayOutputStream();
TTransport transport = new TIOStreamTransport(in, out);
TProtocol inProtocol = inProtocolFactory.getProtocol(transport);
TProtocol outProtocol = outProtocolFactory.getProtocol(transport);
try {
processor.process(inProtocol, outProtocol);
return ((ByteArrayOutputStream) out).toByteArray();
} catch (TException e) {
ExceptionHelper.logException(logger, e, "onRpcMessage");
throw new RuntimeException(e);
} finally {
transport.close();
}
}RpcServer aServer = new RpcServer(this.getContext().getChannel(), source.getValue()) {
@Override
public byte[] handleCall(QueueingConsumer.Delivery request, AMQP.BasicProperties replyProperties) {
return rpcMsgProcessor.onRpcMessage(request.getBody());
}
};原文地址:http://blog.csdn.net/yanghua_kobe/article/details/45250505