标签:
(facebook) thrift / (hadoop) avro / (google) probuf(grpc)是近几年来比较抢眼的高效序列化/rpc框架,dubbo框架虽然有thrift的支持,但是依赖的版本较早,只支持0.8.0,而且还对协议做一些扩展,并非原生的thrift协议。
github上虽然也有朋友对dubbo做了扩展支持原生thrift,但是代码实在太多了,只需要一个类即可:
Thrift2Protocal.java:
package com.alibaba.dubbo.rpc.protocol.thrift2;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import java.lang.reflect.Constructor;
/**
* 为dubbo-rpc添加"原生thrift"支持
* by 杨俊明(http://yjmyzz.cnblogs.com/)
*/
public class Thrift2Protocol extends AbstractProxyProtocol {
public static final int DEFAULT_PORT = 33208;
private static final Logger logger = LoggerFactory.getLogger(Thrift2Protocol.class);
public int getDefaultPort() {
return DEFAULT_PORT;
}
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url)
throws RpcException {
logger.info("impl => " + impl.getClass());
logger.info("type => " + type.getName());
logger.info("url => " + url);
TProcessor tprocessor;
TNonblockingServer.Args tArgs = null;
String iFace = "$Iface";
String processor = "$Processor";
String typeName = type.getName();
TNonblockingServerSocket transport;
if (typeName.endsWith(iFace)) {
String processorClsName = typeName.substring(0, typeName.indexOf(iFace)) + processor;
try {
Class<?> clazz = Class.forName(processorClsName);
Constructor constructor = clazz.getConstructor(type);
try {
tprocessor = (TProcessor) constructor.newInstance(impl);
transport = new TNonblockingServerSocket(url.getPort());
tArgs = new TNonblockingServer.Args(transport);
tArgs.processor(tprocessor);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory(new TCompactProtocol.Factory());
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);
}
}
if (tArgs == null) {
logger.error("Fail to create thrift server(" + url + ") due to null args");
throw new RpcException("Fail to create thrift server(" + url + ") due to null args");
}
final TServer thriftServer = new TNonblockingServer(tArgs);
new Thread(new Runnable() {
public void run() {
logger.info("Start Thrift Server");
thriftServer.serve();
logger.info("Thrift server started.");
}
}).start();
return new Runnable() {
public void run() {
try {
logger.info("Close Thrift Server");
thriftServer.stop();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
}
@Override
protected <T> T doRefer(Class<T> type, URL url) throws RpcException {
logger.info("type => " + type.getName());
logger.info("url => " + url);
try {
TSocket tSocket;
TTransport transport;
TProtocol protocol;
T thriftClient = null;
String iFace = "$Iface";
String client = "$Client";
String typeName = type.getName();
if (typeName.endsWith(iFace)) {
String clientClsName = typeName.substring(0, typeName.indexOf(iFace)) + client;
Class<?> clazz = Class.forName(clientClsName);
Constructor constructor = clazz.getConstructor(TProtocol.class);
try {
tSocket = new TSocket(url.getHost(), url.getPort());
transport = new TFramedTransport(tSocket);
protocol = new TCompactProtocol(transport);
thriftClient = (T) constructor.newInstance(protocol);
transport.open();
logger.info("thrift client opened for service(" + url + ")");
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException("Fail to create remoting client:" + e.getMessage(), e);
}
}
return thriftClient;
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
}
}
重写父类AbstractProxyProtocol的二个抽象方法doExport及doRefer即可,doExport用于对外暴露RPC服务,在这个方法里启动thrift server,dubbo service provider在启动时会调用该方法。而doRefer用于dubbo service consumer发现服务后,获取对应的rpc-client。
参考这个思路,avro也很容易集成进来:
AvroProtocol.java
package com.alibaba.dubbo.rpc.protocol.avro;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.reflect.ReflectRequestor;
import org.apache.avro.ipc.reflect.ReflectResponder;
import java.net.InetSocketAddress;
/**
* 为dubbo-rpc添加avro支持
* by 杨俊明(http://yjmyzz.cnblogs.com/)
*/
public class AvroProtocol extends AbstractProxyProtocol {
public static final int DEFAULT_PORT = 40881;
private static final Logger logger = LoggerFactory.getLogger(AvroProtocol.class);
public int getDefaultPort() {
return DEFAULT_PORT;
}
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url)
throws RpcException {
logger.info("impl => " + impl.getClass());
logger.info("type => " + type.getName());
logger.info("url => " + url);
final Server server = new NettyServer(new ReflectResponder(type, impl),
new InetSocketAddress(url.getHost(), url.getPort()));
server.start();
return new Runnable() {
public void run() {
try {
logger.info("Close Avro Server");
server.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
}
@Override
protected <T> T doRefer(Class<T> type, URL url) throws RpcException {
logger.info("type => " + type.getName());
logger.info("url => " + url);
try {
NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(url.getHost(), url.getPort()));
T ref = ReflectRequestor.getClient(type, client);
logger.info("Create Avro Client");
return ref;
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
}
}
不要忘记在META-INF/dubbo/internal下添加名为com.alibaba.dubbo.rpc.Protocal的文件,内容为:
avro=com.alibaba.dubbo.rpc.protocol.avro.AvroProtocol
最后谈谈如何打包到dubbo的jar里:
dubbo-rpc/pom.xml里,把二个新增的项目加进来:
<modules> ... <module>dubbo-rpc-avro</module> ... <module>dubbo-rpc-thrift2</module> ... </modules>
然后dubbo/pom.xml里:
<artifactSet> <includes> ... <include>com.alibaba:dubbo-rpc-api</include> <include>com.alibaba:dubbo-rpc-avro</include> ... <include>com.alibaba:dubbo-rpc-thrift2</include> ... </includes> </artifactSet>
dependencies节也要增加:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-rpc-thrift2</artifactId>
<version>${project.parent.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-rpc-avro</artifactId>
<version>${project.parent.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</exclusion>
</exclusions>
</dependency>
这样打包出来的dubbo-xxx.jar里,就包括新增的Protocol。至于google的protobuf,目前处于3.x -beta阶段,等以后出正式版了,再看情况整合起来。
以上代码已经提交到github:https://github.com/yjmyzz/dubbox (版本号:2.8.4a)
dubbo/dubbox 增加原生thrift及avro支持
标签:
原文地址:http://www.cnblogs.com/yjmyzz/p/dubbo-pritimive-thrift-avro-support.html