本文主要探讨了消息总线支持Thrift RPC的实现过程。鉴于RabbitMQ官方的Java Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API。然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时为SOA提供基础设施。
Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目。Thrift通过一个中间语言(IDL, 接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(目前支持C++,Java, Python,PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk和OCaml),并由生成的代码负责RPC协议层和传输层的实现。
做这件事情的初衷是RabbitMQ可以用于模拟request/response这样的通信模型,而这个通信模型就是通常C/S以及B/S架构的通信模型。并且因为RPC的流行,其官方java client已经提供了对基于JSON(文本协议)的RPC的实现。而Thrift本身是个RPC框架,提供跨语言、多协议、多传输通信机制的实现。如果能将两者衔接起来,消息总线对RPC的支持无疑更加完善。
Thrift的实现是基于类似TCP/IP的多层协议栈模型。它的特点是对等通信,逻辑分离,分层解耦。如下图:
在协议层,目前Thrift支持众多的协议,这些协议大致分为两类:
public void flush() throws TTransportException { byte[] data = this.reqMsgStream.toByteArray(); this.reqMsgStream.reset(); byte[] responseData = new byte[0]; try { responseData = this.client.primitiveRequest(secret, target, data, token, timeout); } catch (Exception e) { ExceptionHelper.logException(logger, e, "[flush]"); } this.respMsgStream = new ByteArrayInputStream(responseData); }
public void testThriftRpc() throws Exception { TTransport transport = new TAMQPClientTransport(this.client, "kliwhiduhaiucvarkjajksdbfkjabw", "emapDemoRpcResponse", "klasehnfkljashdnflhkjahwlekdjf", 10000); transport.open(); TProtocol protocol = new TJSONProtocol(transport); CalcService.Client client = new CalcService.Client(protocol); int result = client.calcSum(); logger.info(result); transport.close(); }
MessagebusSinglePool singlePool = new MessagebusSinglePool(host, port);; Messagebus client = singlePool.getResource(); //server code WrappedRpcServer rpcServer = null; try { TProcessor processor = new CalcService.Processor(new CalcServiceImpl()); TProtocolFactory inProtocolFactory = new TJSONProtocol.Factory(); TProtocolFactory outProtocolFactory = new TJSONProtocol.Factory(); rpcServer = client.buildRpcServer("mshdfjbqwejhfgasdfbjqkygaksdfa", new ThriftMessageHandler(processor, inProtocolFactory, outProtocolFactory)); rpcServer.mainLoop(); } finally { rpcServer.close(); singlePool.returnResource(client); singlePool.destroy(); }
消息总线提供了一个API:buildRpcServer,它构建出一个封装后的WrappedRpcServer。该WrappedRpcServer封装了上文提到的RabbitMQ java client自带的RpcServer。为什么要封装?主要的原因还是信息隐藏。其实,如果只接入RabbitMQ,不封装是没有问题的。但现在的目的是接入消息总线,而消息总线在RabbitMQjava client之上又封装了一层,屏蔽了一些不必要的设置,而这些设置恰好又是构建这个RpcServer类的实例所必备的参数(比如 queue name,channel等)。
因此这里我们基于组合的方式将RpcServer从WrappedRpcServer类的构造器注入进来,使得WrappedRpcServer成为RpcServer的代理,WrappedRpcServer的构造器访问标识符被设置为private,这是因为我们在消息总线内部构建了RpcServer的实例,然后通过反射机制来构造WrappedRpcServer的实例。看代码:public WrappedRpcServer buildRpcServer(String secret, final IRpcMessageProcessor rpcMsgProcessor) { Node source = this.getContext().getConfigManager().getNodeView(secret).getCurrentQueue(); try { RpcServer aServer = new RpcServer(this.getContext().getChannel(), source.getValue()) { @Override public byte[] handleCall(QueueingConsumer.Delivery request, AMQP.BasicProperties replyProperties) { return rpcMsgProcessor.onRpcMessage(request.getBody()); } }; Constructor<WrappedRpcServer> rpcServerConstructor = WrappedRpcServer.class.getDeclaredConstructor(RpcServer.class); rpcServerConstructor.setAccessible(true); WrappedRpcServer wrappedRpcServer = rpcServerConstructor.newInstance(aServer); rpcServerConstructor.setAccessible(false); return wrappedRpcServer;
public interface IRpcMessageProcessor { public byte[] onRpcMessage(byte[] in); }
public byte[] onRpcMessage(byte[] inMsg) { InputStream in = new ByteArrayInputStream(inMsg); OutputStream out = new ByteArrayOutputStream(); TTransport transport = new TIOStreamTransport(in, out); TProtocol inProtocol = inProtocolFactory.getProtocol(transport); TProtocol outProtocol = outProtocolFactory.getProtocol(transport); try { processor.process(inProtocol, outProtocol); return ((ByteArrayOutputStream) out).toByteArray(); } catch (TException e) { ExceptionHelper.logException(logger, e, "onRpcMessage"); throw new RuntimeException(e); } finally { transport.close(); } }
RpcServer aServer = new RpcServer(this.getContext().getChannel(), source.getValue()) { @Override public byte[] handleCall(QueueingConsumer.Delivery request, AMQP.BasicProperties replyProperties) { return rpcMsgProcessor.onRpcMessage(request.getBody()); } };
原文地址:http://blog.csdn.net/yanghua_kobe/article/details/45250505