标签:
简介
Thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和
代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。
Thrift最初由facebook开发用做系统内各语言之间的RPC通信 。
2007年由facebook贡献到apache基金 ,08年5月进入apache孵化器 。
支持多种语言之间的RPC方式的通信:php语言client可以构造一个对象,调用相应的服务方法来调用java语言的服务 ,跨越语言的C/S RPC调用 。
Thrift允许定义一个简单的
定义文件中的数据类型和服务接口,以作为输入文件,
编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。
一个小实例
以UserService为例,描述一下使用thrift的方式,以及其原理。
service.thrift
- struct User {
- 1:i64 id,
- 2:string name,
- 3:i64 timestamp,
- 4:bool vip
- }
-
- service UserService {
- User getById(1:i64 id)
- }
你可以将自己的Java服务通过".thrift"文件描述出来,并提供给服务消费端,那么消费端即可以生成自己的API文件。Thrift框架目前已经支持大部分主流的语言。需要注意,因为Thrift考虑到struct/service定义需要兼容多种语言的”风格",所以它只支持一些基本的数据类型(比如i32,i64,string等),以及service定义的方法不能重名,即使参数列表不同(并不是所有的语言都能像JAVA一样支持重载)。
生成API文件
首先下载和安装thrift客户端,比如在windows平台下,下载thrift.exe。不过此处需要提醒,不同的thrift客户端版本生成的API可能不兼容。本例使用thrift-0.9.0.exe,通过"--gen"指定生成API所适配的语言。本实例为生成java客户端API。
Java代码
- //windows平台下,将API文件输出在service目录下(此目录需要存在)
- > thrift.exe --gen java -o service service.thrift
需要明确的是:Thrift和其他RPC框架不同,thrift在生成的API文件中,已经描述了"调用过程"(即硬编码),而不是像其他RPC那样在运行时(runtime)动态解析方法调用或者参数。
UserService实现类
Java代码
- public class UserServiceImpl implements UserService.Iface {
- @Override
- public User getById(long id){
- System.out.println("invoke...id:" + id);
- return new User();//for test
- }
- }
原理简析
User.java: thrift生成API的能力还是非常的有限,比如在struct中只能使用简单的数据类型(不支持Date,Collection<?>等),不过我们能从User中看出,它生成的类实现了"Serializable"接口和"TBase”接口。其中Serializable接口表明这个类的实例是需要序列化之后在网络中传输的,为了不干扰Java本身的序列化和反序列化机制,它还重写了readObject和writeObject方法,不过这对thrift本身并没有帮助。
TBase接口是thrift序列化和反序列化时使用的,它的两个核心方法:read和write。在上述的thrift文件中,struct定义的每个属性都有一个序号,比如1:id,那么thrift在序列化时,将会根据序号的顺序依次将属性的"名称 + 值"写入inputStream中,反序列化也是如此。(具体参见read和write的实现)。
Java代码
- //read方法逐个读取字段,按照"索引",最终将"struct"对象封装完毕.
- //write方法也非常类似,按照"索引"顺序逐个输出到流中.
- while (true){
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- case 1: // ID
- if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
- struct.id = iprot.readI32();
- struct.setIdIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 2: // NAME
- ..
- }
- }
因为thrift的序列化和反序列化实例数据时,是根据"属性序号"进行,这可以保证数据在inputstream和outputstream中顺序是严格的,此外每个struct中"序号"不能重复,但是可以不需要从"1"开始.如果"序号"有重复,将导致无法生成API文件.这一点也要求API开发者,如果更改了thrift文件中的struct定义,需要重新生成客户端API,否则服务将无法继续使用(可能报错,也可能数据错误).thrift序列化/反序列化的过程和JAVA自带的序列化机制不同,它将不会携带额外的class结构,此外thrift这种序列化机制更加适合网络传输,而且性能更加高效.
UserService.Client: 在生成的UserService中,有个Client静态类,这个类就是一个典型的代理类,此类已经实现了UserService的所有方法。开发者需要使用Client类中的API方法与Thrift server端交互,它将负责与Thrift server的Socket链接中,发送请求和接收响应。
需要注意的时,每次Client方法调用,都会在一个Socket链接中进行。这就意味着,在使用Client消费服务之前,需要和Thrift server建立有效的TCP链接。(稍后代码示例)
1) 发送请求:
Java代码
- //参见:TServiceClient
- //API方法调用时,发送请求数据流
- protected void sendBase(String methodName, TBase args) throws TException {
- oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//首先写入"方法名称"和"seqid_"
- args.write(oprot_);//序列化参数
- oprot_.writeMessageEnd();
- oprot_.getTransport().flush();
- }
-
- protected void receiveBase(TBase result, String methodName) throws TException {
- TMessage msg = iprot_.readMessageBegin();//如果执行有异常
- if (msg.type == TMessageType.EXCEPTION) {
- TApplicationException x = TApplicationException.read(iprot_);
- iprot_.readMessageEnd();
- throw x;
- }//检测seqid是否一致
- if (msg.seqid != seqid_) {
- throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
- }
- result.read(iprot_);//反序列化
- iprot_.readMessageEnd();
- }
Thrift提供了简单的容错方式:每次方法调用,都会在Client端标记一个seqid,这是一个自增的本地ID,在TCP请求时将此seqid追加到流中,同时Server端响应时,也将此seqid原样返回过来;这样客户端就可以根据此值用来判断"请求--响应"是对应的,如果出现乱序,将会导致此请求以异常的方式结束。
Java代码
- //参考: TBaseProcessor.java
- @Override
- public boolean process(TProtocol in, TProtocol out) throws TException {
- TMessage msg = in.readMessageBegin();
- ProcessFunction fn = processMap.get(msg.name);//根据方法名,查找"内部类"
- if (fn == null) {
- TProtocolUtil.skip(in, TType.STRUCT);
- in.readMessageEnd();
- TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: ‘"+msg.name+"‘");
- out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
- x.write(out);//序列化响应结果,直接输出
- out.writeMessageEnd();
- out.getTransport().flush();
- return true;
- }
- fn.process(msg.seqid, in, out, iface);
- return true;
- }
thrift生成的UserService.Processor类,就是server端用来处理请求过程的"代理类";server端从socket中读取请求需要调用的"方法名" +参数列表,并交付给Processor类处理;和其他的RPC调用不同的时,thrift并没有使用类似于"反射机制"的方式来调用方法,而是将UserService的每个方法生成一个"内部类":
Java代码
- public static class getById<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getById_args> {
- public getById() {
- super("getById");//其中getById为标识符
- }
-
- public getById_args getEmptyArgsInstance() {
- return new getById_args();
- }
-
- protected boolean isOneway() {
- return false;
- }
- //实际处理方法
- public getById_result getResult(I iface, getById_args args) throws org.apache.thrift.TException {
- getById_result result = new getById_result();
- result.success = iface.getById(args.id);
- return result;
- }
- }
这个"内部类",将会在Processor初始化的时候,放入到一个map中,此后即可以通过"方法名"查找,然后调用其"getResult"方法了.
Java代码
- public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
-
- public Processor(I iface) {
- super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
- }
-
- protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
- super(iface, getProcessMap(processMap));
- }
-
- private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
- //放入map
- processMap.put("getById", new getById());
- return processMap;
- }
- ....
- }
Java代码
- TThreadPoolServer
- public void serve() {
- try {
- //启动服务
- serverTransport_.listen();
- } catch (TTransportException ttx) {
- LOGGER.error("Error occurred during listening.", ttx);
- return;
- }
-
- // Run the preServe event
- if (eventHandler_ != null) {
- eventHandler_.preServe();
- }
-
- stopped_ = false;
- setServing(true);
- //循环,直到被关闭
- while (!stopped_) {
- int failureCount = 0;
- try {
- //accept客户端Socket链接,
- //对于每个新链接,将会封装成runnable,并提交给线程或者线程池中运行.
- TTransport client = serverTransport_.accept();
- WorkerProcess wp = new WorkerProcess(client);
- executorService_.execute(wp);
- } catch (TTransportException ttx) {
- if (!stopped_) {
- ++failureCount;
- LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
- }
- }
- }
- //....
- }
Thrift Server端,设计思路也非常的直接。当前Service server启动之后,将会以阻塞的方式侦听Socket链接(代码参考TThreadPoolServer),每建立一个Socket链接,都会将此Socket经过封装之后,放入线程池中,本质上也是一个Socket链接对应一个Worker Thread。这个Thread只会处理此Socket中的所有数据请求,直到Socket关闭。
Java代码
- //参考:WorkerProcess
- while (true) {
-
- if (eventHandler != null) {
- eventHandler.processContext(connectionContext, inputTransport, outputTransport);
- }
-
- if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {
- break;
- }
- }
当有Socket链接不是很多的时候,TThreadPoolServer并不会有太大的性能问题,可以通过指定ThreadPool中线程的个数进行简单的调优。如果Socket链接很多,我们只能使用TThreadedSelectorServer来做支撑,TThreadedSelectorServer内部基于NIO模式,具有异步的特性,可以极大的提升server端的并发能力;不过在绝大多数情况下,在thrift中使用"异步"似乎不太容易让人接受,毕竟这意味着Client端需要阻塞,并且在高并发环境中这个阻塞时间是不可控的。但SelecorServer确实可以有效的提升Server的并发能力,而且在一定程度上可以提升吞吐能力,这或许是我们优化Thrift Server比较可靠的方式之一。
Java代码
- public class UserServiceClient {
-
- public void startClient() {
- TTransport transport;
- try {
- transport = new TSocket("localhost", 1234);
- TProtocol protocol = new TBinaryProtocol(transport);
- UserService.Client client = new UserService.Client(protocol);
- transport.open();
- User user = client.getById(1000);
- ////
- transport.close();
- } catch (TTransportException e) {
- e.printStackTrace();
- } catch (TException e) {
- e.printStackTrace();
- }
- }
-
- }
Java代码
- public class Server {
- public void startServer() {
- try {
- TServerSocket serverTransport = new TServerSocket(1234);
- UserService.Processor process = new Processor(new UserServiceImpl());
- Factory portFactory = new TBinaryProtocol.Factory(true, true);
- Args args = new Args(serverTransport);
- args.processor(process);
- args.protocolFactory(portFactory);
- TServer server = new TThreadPoolServer(args);
- server.serve();
- } catch (TTransportException e) {
- e.printStackTrace();
- }
- }
- }
到这里,你就会发现,一个service,需要server端启动一个ServerSocket,如果你有很多service,那么你需要让这些service尽可能的分布在不同的物理server上,否则一个物理server上运行太多的ServerSocket进程并不是一件让人愉快的事情.。或者你让几个service整合成一个。
问题总没有想象的那么简单,其实service被拆分的粒度越细,越容易被部署和扩展,对于负载均衡就更加有利。如何让一个service分布式部署,稍后再继续分享。
1) Thrift文件定义struct和service API,此文件可以被其他语言生成API文件或者类文件。
2) 使用Thrift客户端生成API文件。
3) Java服务端(即服务提供端),实现service功能。
4) 服务端将server发布成一个Thrift server:即将service嵌入到一个ServerSocket中。
5) 客户端启动Socket,并和Thrift server建立TCP连接,并使用Client代理类操作远程接口。
服务端开发
Thrift服务server端,其实就是一个ServerSocket线程+处理器,当Thrift-client端建立链接之后,处理器负责解析socket流信息,并根据其指定的"方法名"+参数列表,来调用"服务实现类”的方法,并将执行结果(或者异常)写入到socket中。
一个server,就需要创建一个ServerSocket,并侦听本地的一个端口,这种情况对分布式部署,有一些额外的要求:client端需要知道一个"service"被部署在了那些server上。
设计思路:
1) 每个server内部采用threadPool的方式,来提升并发能力.
2) 当server启动成功后,向zookeeper注册服务节点,此后client端就可以"感知到"服务的状态
3) 通过spring的方式,配置thrift-server服务类.
其中x注册是可选选项
- <dependencies>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>3.0.7.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.5</version>
- <!--<exclusions>-->
- <!--<exclusion>-->
- <!--<groupId>log4j</groupId>-->
- <!--<artifactId>log4j</artifactId>-->
- <!--</exclusion>-->
- <!--</exclusions>-->
- </dependency>
- <!--
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.4</version>
- </dependency>
- -->
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.3.0</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
-
- </dependencies>
本实例,使用了apache-curator作为zookeeper客户端.
2. spring-thrift-server.xml
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
- <property name="connectString" value="127.0.0.1:2181"></property>
- <property name="namespace" value="demo/thrift-service"></property>
- </bean>
- <bean id="sericeAddressReporter" class="com.demo.thrift.support.impl.DynamicAddressReporter" destroy-method="close">
- <property name="zookeeper" ref="thriftZookeeper"></property>
- </bean>
- <bean id="userService" class="com.demo.service.UserServiceImpl"/>
- <bean class="com.demo.thrift.ThriftServiceServerFactory" destroy-method="close">
- <property name="service" ref="userService"></property>
- <property name="configPath" value="UserServiceImpl"></property>
- <property name="port" value="9090"></property>
- <property name="addressReporter" ref="sericeAddressReporter"></property>
- </bean>
3. ThriftServiceServerFactory.java
此类严格上说并不是一个工厂类,它的主要作用就是封装指定的"service" ,然后启动一个server的过程,其中"service"属性表示服务的实现类,addressReporter表示当server启动成功后,需要指定的操作(比如,向zookeeper发送service的IP信息).
究竟当前server的ip地址是多少,在不同的设计中,有所不同,比如:有些管理员喜欢将本机的IP地址写入到os下的某个文件中,如果上层应用需要获取可靠的IP信息,就需要读取这个文件...你可以实现自己的ThriftServerIpTransfer来获取当前server的IP.
为了减少xml中的配置信息,在factory中,使用了反射机制来构建"Processor"类.
4. DynamicAddressReporter.java
在ThriftServiceServerFactory中,有个可选的属性:addressReporter, DynamicAddressReporter提供了向zookeeper注册service信息的能力,当server启动正常后,把server的IP + port发送到zookeeper中;那么此后服务消费client,就可以从zookeeper中获取server列表,并与它们建立链接(池).这样client端只需要关注zookeeper的节点名称即可,不需要配置大量的ip+port.
- public class DynamicAddressReporter implements ThriftServerAddressReporter {
-
- private CuratorFramework zookeeper;
-
- public DynamicAddressReporter(){}
-
- public DynamicAddressReporter(CuratorFramework zookeeper){
- this.zookeeper = zookeeper;
- }
-
-
- public void setZookeeper(CuratorFramework zookeeper) {
- this.zookeeper = zookeeper;
- }
-
- @Override
- public void report(String service, String address) throws Exception {
- if(zookeeper.getState() == CuratorFrameworkState.LATENT){
- zookeeper.start();
- zookeeper.newNamespaceAwareEnsurePath(service);
- }
- zookeeper.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
- .forPath(service +"/i_",address.getBytes("utf-8"));
- }
-
-
- public void close(){
- zookeeper.close();
- }
-
- }
5. 测试类
- public class ServiceMain {
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- try {
- ApplicationContext context = new ClassPathXmlApplicationContext("spring-thrift-server.xml");
- Thread.sleep(3000000);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- }
客户端开发
Thrift-client作为服务消费端,由于thrift使用socket通讯,因此它需要面对几个问题:
1) client端需要知道server端的IP+port,如果是分布式部署,还需要知道所有server的IP+port列表。
2) client为了提升性能,不可能只使用一个socket来处理并发请求,当然也不能每个请求都创建一个socket;我们需要使用连接池方案。
3) 对于java开发工程师而言,基于spring配置thrift服务,可以提供很多的便利。
4) 基于zookeeper配置管理,那么client端就不需要"硬编码"的配置server的ip+port,可以使用zookeeper来推送每个service的服务地址。
5) 因为thrift-client端不使用连接池的话,将不能有效的提高并发能力,本文重点描述看如何使用thrift-client连接池。
1. pom.xml
- <dependencies>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>3.0.7.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.5</version>
- <!--<exclusions>-->
- <!--<exclusion>-->
- <!--<groupId>log4j</groupId>-->
- <!--<artifactId>log4j</artifactId>-->
- <!--</exclusion>-->
- <!--</exclusions>-->
- </dependency>
- <!--
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.4</version>
- </dependency>
- -->
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.3.0</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
-
- </dependencies>
2. spring-thrift-client.xml
其中zookeeper作为可选项,开发者也可以通过制定serverAddress的方式指定server的地址.
- <!-- fixedAddress -->
- <!--
- <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">
- <property name="service" value="com.demo.service.UserService"></property>
- <property name="serverAddress" value="127.0.0.1:9090:2"></property>
- <property name="maxActive" value="5"></property>
- <property name="idleTime" value="10000"></property>
- </bean>
- -->
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
- <property name="connectString" value="127.0.0.1:2181"></property>
- <property name="namespace" value="demo/thrift-service"></property>
- </bean>
- <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">
- <property name="service" value="com.demo.service.UserService"></property>
- <property name="maxActive" value="5"></property>
- <property name="idleTime" value="1800000"></property>
- <property name="addressProvider">
- <bean class="com.demo.thrift.support.impl.DynamicAddressProvider">
- <property name="configPath" value="UserServiceImpl"></property>
- <property name="zookeeper" ref="thriftZookeeper"></property>
- </bean>
- </property>
- </bean>
3. ThriftServiceClientProxyFactory.java
因为我们要在client端使用连接池方案,那么就需要对client的方法调用过程,进行代理,这个类,就是维护了一个"Client"代理类,并在方法调用时,从"对象池"中取出一个"Client"对象,并在方法实际调用结束后归还给"对象池".
- @SuppressWarnings("rawtypes")
- public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean {
-
- private String service;
-
- private String serverAddress;
-
- private Integer maxActive = 32;//最大活跃连接数
-
- ////ms,default 3 min,链接空闲时间
- //-1,关闭空闲检测
- private Integer idleTime = 180000;
- private ThriftServerAddressProvider addressProvider;
-
- private Object proxyClient;
-
-
- public void setMaxActive(Integer maxActive) {
- this.maxActive = maxActive;
- }
-
-
- public void setIdleTime(Integer idleTime) {
- this.idleTime = idleTime;
- }
-
-
- public void setService(String service) {
- this.service = service;
- }
-
-
- public void setServerAddress(String serverAddress) {
- this.serverAddress = serverAddress;
- }
-
-
- public void setAddressProvider(ThriftServerAddressProvider addressProvider) {
- this.addressProvider = addressProvider;
- }
-
- private Class objectClass;
-
- private GenericObjectPool<TServiceClient> pool;
-
- private PoolOperationCallBack callback = new PoolOperationCallBack() {
-
- @Override
- public void make(TServiceClient client) {
- System.out.println("create");
-
- }
-
- @Override
- public void destroy(TServiceClient client) {
- System.out.println("destroy");
-
- }
- };
-
- @Override
- public void afterPropertiesSet() throws Exception {
- if(serverAddress != null){
- addressProvider = new FixedAddressProvider(serverAddress);
- }
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- //加载Iface接口
- objectClass = classLoader.loadClass(service + "$Iface");
- //加载Client.Factory类
- Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory");
- TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
- ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback);
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = maxActive;
- poolConfig.minIdle = 0;
- poolConfig.minEvictableIdleTimeMillis = idleTime;
- poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L;
- pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig);
- proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- //
- TServiceClient client = pool.borrowObject();
- try{
- return method.invoke(client, args);
- }catch(Exception e){
- throw e;
- }finally{
- pool.returnObject(client);
- }
- }
- });
- }
-
- @Override
- public Object getObject() throws Exception {
- return proxyClient;
- }
-
- @Override
- public Class<?> getObjectType() {
- return objectClass;
- }
-
- @Override
- public boolean isSingleton() {
- return true; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void close(){
- if(addressProvider != null){
- addressProvider.close();
- }
- }
- }
4. ThriftClientPoolFactory.java
"Client"对象池,对象池中是已经实例化的Client对象,Client对象负责与Thrift server通信.
- /**
- * 连接池,thrift-client for spring
- */
- public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{
-
- private final ThriftServerAddressProvider addressProvider;
-
- private final TServiceClientFactory<TServiceClient> clientFactory;
-
- private PoolOperationCallBack callback;
-
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
- this.addressProvider = addressProvider;
- this.clientFactory = clientFactory;
- }
-
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {
- this.addressProvider = addressProvider;
- this.clientFactory = clientFactory;
- this.callback = callback;
- }
-
-
-
- @Override
- public TServiceClient makeObject() throws Exception {
- InetSocketAddress address = addressProvider.selector();
- TSocket tsocket = new TSocket(address.getHostName(),address.getPort());
- TProtocol protocol = new TBinaryProtocol(tsocket);
- TServiceClient client = this.clientFactory.getClient(protocol);
- tsocket.open();
- if(callback != null){
- try{
- callback.make(client);
- }catch(Exception e){
- //
- }
- }
- return client;
- }
-
- public void destroyObject(TServiceClient client) throws Exception {
- if(callback != null){
- try{
- callback.destroy(client);
- }catch(Exception e){
- //
- }
- }
- TTransport pin = client.getInputProtocol().getTransport();
- pin.close();
- }
-
- public boolean validateObject(TServiceClient client) {
- TTransport pin = client.getInputProtocol().getTransport();
- return pin.isOpen();
- }
-
- static interface PoolOperationCallBack {
- //销毁client之前执行
- void destroy(TServiceClient client);
- //创建成功是执行
- void make(TServiceClient client);
- }
-
- }
5. DynamicAddressProvider.java
将zookeeper作为server地址的提供者,这样客户端就不需要再配置文件中指定一堆ip + port,而且当server服务有更新时,也不需要client端重新配置.
- /**
- * 可以动态获取address地址,方案设计参考
- * 1) 可以间歇性的调用一个web-service来获取地址
- * 2) 可以使用事件监听机制,被动的接收消息,来获取最新的地址(比如基于MQ,nio等)
- * 3) 可以基于zookeeper-watcher机制,获取最新地址
- * <p/>
- * 本实例,使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
- * 如下实现,仅供参考
- */
- public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {
-
- private String configPath;
-
- private PathChildrenCache cachedPath;
-
- private CuratorFramework zookeeper;
-
- //用来保存当前provider所接触过的地址记录
- //当zookeeper集群故障时,可以使用trace中地址,作为"备份"
- private Set<String> trace = new HashSet<String>();
-
- private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
-
- private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
-
- private Object lock = new Object();
-
- private static final Integer DEFAULT_PRIORITY = 1;
-
- public void setConfigPath(String configPath) {
- this.configPath = configPath;
- }
-
- public void setZookeeper(CuratorFramework zookeeper) {
- this.zookeeper = zookeeper;
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- //如果zk尚未启动,则启动
- if(zookeeper.getState() == CuratorFrameworkState.LATENT){
- zookeeper.start();
- }
- buildPathChildrenCache(zookeeper, configPath, true);
- cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
- }
-
- private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
- cachedPath = new PathChildrenCache(client, path, cacheData);
- cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- PathChildrenCacheEvent.Type eventType = event.getType();
- switch (eventType) {
- // case CONNECTION_RECONNECTED:
- //
- // break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_LOST:
- System.out.println("Connection error,waiting...");
- return;
- default:
- //
- }
- //任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
- cachedPath.rebuild();
- rebuild();
- }
-
- protected void rebuild() throws Exception {
- List<ChildData> children = cachedPath.getCurrentData();
- if (children == null || children.isEmpty()) {
- //有可能所有的thrift server都与zookeeper断开了链接
- //但是,有可能,thrift client与thrift server之间的网络是良好的
- //因此此处是否需要清空container,是需要多方面考虑的.
- container.clear();
- System.out.println("thrift server-cluster error....");
- return;
- }
- List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
- for (ChildData data : children) {
- String address = new String(data.getData(), "utf-8");
- current.addAll(transfer(address));
- trace.add(address);
- }
- Collections.shuffle(current);
- synchronized (lock) {
- container.clear();
- container.addAll(current);
- inner.clear();
- inner.addAll(current);
-
- }
- }
- });
- }
-
- private List<InetSocketAddress> transfer(String address){
- String[] hostname = address.split(":");
- Integer priority = DEFAULT_PRIORITY;
- if (hostname.length == 3) {
- priority = Integer.valueOf(hostname[2]);
- }
- String ip = hostname[0];
- Integer port = Integer.valueOf(hostname[1]);
- List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
- for (int i = 0; i < priority; i++) {
- result.add(new InetSocketAddress(ip, port));
- }
- return result;
- }
-
-
- @Override
- public List<InetSocketAddress> getAll() {
- return Collections.unmodifiableList(container);
- }
-
- @Override
- public synchronized InetSocketAddress selector() {
- if (inner.isEmpty()) {
- if(!container.isEmpty()){
- inner.addAll(container);
- }else if(!trace.isEmpty()){
- synchronized (lock) {
- for(String hostname : trace){
- container.addAll(transfer(hostname));
- }
- Collections.shuffle(container);
- inner.addAll(container);
- }
- }
- }
- return inner.poll();//null
- }
-
-
- @Override
- public void close() {
- try {
- cachedPath.close();
- zookeeper.close();
- } catch (Exception e) {
- //
- }
- }
- }
到此为止,我们的Thrift基本上就可以顺利运行起来了。
使用ZooKeeper构建集群
通用办法是使用apache-curator组件来支持thrift连接zk提供集群服务并且推送服务信息变化。
其他高级话题
对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:
1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。
2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。
3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:
常见的负载均衡的基本算法。
4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。
5.其他的改造如:
1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问
2)Thrift通过两种方式调用服务Client和Iface
- // *) Client API 调用
- (EchoService.Client)client.echo("hello lilei"); ---(1)
- // *) Service 接口 调用
- (EchoService.Iface)service.echo("hello lilei"); ---(2)
Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.2</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>4.0.9.RELEASE</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.6</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.7.1</version>
- </dependency>
二、使用zookeeper管理服务节点配置
RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。
注: 该图源自dubbo的官网
这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.
Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构:
每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:
1). PERSISTENT: 永久节点
2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
注: 临时节点不能成为父节点
Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:
1). KeeperState: Disconnected,SyncConnected,Expired
2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服务端:
作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.
1). namespace: 命名空间,来区分不同应用
2). service: 服务接口, 采用发布方的类全名来表示
3). version: 版本号
借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境.
*) 数据模型的设计
具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.
1.定义Zookeeper的客户端的管理
ZookeeperFactory.java
package cn.slimsmart.thrift.rpc.zookeeper;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.springframework.beans.factory.FactoryBean;
- import org.springframework.util.StringUtils;
-
- /**
- * 获取zookeeper客户端链接
- */
- public class ZookeeperFactory implements FactoryBean<CuratorFramework> {
-
- private String zkHosts;
- // session超时
- private int sessionTimeout = 30000;
- private int connectionTimeout = 30000;
-
- // 共享一个zk链接
- private boolean singleton = true;
-
- // 全局path前缀,常用来区分不同的应用
- private String namespace;
-
- private final static String ROOT = "rpc";
-
- private CuratorFramework zkClient;
-
- public void setZkHosts(String zkHosts) {
- this.zkHosts = zkHosts;
- }
-
- public void setSessionTimeout(int sessionTimeout) {
- this.sessionTimeout = sessionTimeout;
- }
-
- public void setConnectionTimeout(int connectionTimeout) {
- this.connectionTimeout = connectionTimeout;
- }
-
- public void setSingleton(boolean singleton) {
- this.singleton = singleton;
- }
-
- public void setNamespace(String namespace) {
- this.namespace = namespace;
- }
-
- public void setZkClient(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
-
- @Override
- public CuratorFramework getObject() throws Exception {
- if (singleton) {
- if (zkClient == null) {
- zkClient = create();
- zkClient.start();
- }
- return zkClient;
- }
- return create();
- }
-
- @Override
- public Class<?> getObjectType() {
- return CuratorFramework.class;
- }
-
- @Override
- public boolean isSingleton() {
- return singleton;
- }
-
- public CuratorFramework create() throws Exception {
- if (StringUtils.isEmpty(namespace)) {
- namespace = ROOT;
- } else {
- namespace = ROOT +"/"+ namespace;
- }
- return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
- }
-
- public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
- return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
- .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
- .defaultData(null).build();
- }
-
- public void close() {
- if (zkClient != null) {
- zkClient.close();
- }
- }
- }
2.服务端注册服务
由于服务端配置需要获取本机的IP地址,因此定义IP获取接口
ThriftServerIpResolve.java
- package cn.slimsmart.thrift.rpc.zookeeper;
-
- /**
- *
- * 解析thrift-server端IP地址,用于注册服务
- * 1) 可以从一个物理机器或者虚机的特殊文件中解析
- * 2) 可以获取指定网卡序号的Ip
- * 3) 其他
- */
- public interface ThriftServerIpResolve {
-
- String getServerIp() throws Exception;
-
- void reset();
-
- //当IP变更时,将会调用reset方法
- static interface IpRestCalllBack{
- public void rest(String newIp);
- }
- }
可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp
ThriftServerIpLocalNetworkResolve.java
- package cn.slimsmart.thrift.rpc.zookeeper;
-
- import java.net.Inet6Address;
- import java.net.InetAddress;
- import java.net.NetworkInterface;
- import java.net.SocketException;
- import java.util.Enumeration;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- /**
- * 解析网卡Ip
- *
- */
- public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
-
- private Logger logger = LoggerFactory.getLogger(getClass());
-
- //缓存
- private String serverIp;
-
- public void setServerIp(String serverIp) {
- this.serverIp = serverIp;
- }
-
- @Override
- public String getServerIp() {
- if (serverIp != null) {
- return serverIp;
- }
- // 一个主机有多个网络接口
- try {
- Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
- while (netInterfaces.hasMoreElements()) {
- NetworkInterface netInterface = netInterfaces.nextElement();
- // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
- Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
- while (addresses.hasMoreElements()) {
- InetAddress address = addresses.nextElement();
- if(address instanceof Inet6Address){
- continue;
- }
- if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
- serverIp = address.getHostAddress();
- logger.info("resolve server ip :"+ serverIp);
- continue;
- }
- }
- }
- } catch (SocketException e) {
- e.printStackTrace();
- }
- return serverIp;
- }
-
- @Override
- public void reset() {
- serverIp = null;
- }
- }
接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。
ThriftServerAddressRegister.java
- package cn.slimsmart.thrift.rpc.zookeeper;
-
- /**
- * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器
- */
- public interface ThriftServerAddressRegister {
- /**
- * 发布服务接口
- * @param service 服务接口名称,一个产品中不能重复
- * @param version 服务接口的版本号,默认1.0.0
- * @param address 服务发布的地址和端口
- */
- void register(String service,String version,String address);
- }
实现:ThriftServerAddressRegisterZookeeper.java
- package cn.slimsmart.thrift.rpc.zookeeper;
-
- import java.io.UnsupportedEncodingException;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.imps.CuratorFrameworkState;
- import org.apache.zookeeper.CreateMode;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.util.StringUtils;
-
- import cn.slimsmart.thrift.rpc.ThriftException;
-
- /**
- * 注册服务列表到Zookeeper
- */
- public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{
-
- private Logger logger = LoggerFactory.getLogger(getClass());
-
- private CuratorFramework zkClient;
-
- public ThriftServerAddressRegisterZookeeper(){}
-
- public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
- this.zkClient = zkClient;
- }
-
- public void setZkClient(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
-
- @Override
- public void register(String service, String version, String address) {
- if(zkClient.getState() == CuratorFrameworkState.LATENT){
- zkClient.start();
- }
- if(StringUtils.isEmpty(version)){
- version="1.0.0";
- }
- //临时节点
- try {
- zkClient.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL)
- .forPath("/"+service+"/"+version+"/"+address);
- } catch (UnsupportedEncodingException e) {
- logger.error("register service address to zookeeper exception:{}",e);
- throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);
- } catch (Exception e) {
- logger.error("register service address to zookeeper exception:{}",e);
- throw new ThriftException("register service address to zookeeper exception:{}", e);
- }
- }
-
- public void close(){
- zkClient.close();
- }
- }
3.客户端发现服务
定义获取服务地址接口
ThriftServerAddressProvider.java
- package cn.slimsmart.thrift.rpc.zookeeper;
-
- import java.net.InetSocketAddress;
- import java.util.List;
-
- /**
- * thrift server-service地址提供者,以便构建客户端连接池
- */
- public interface ThriftServerAddressProvider {
-
- //获取服务名称
- String getService();
-
- /**
- * 获取所有服务端地址
- * @return
- */
- List<InetSocketAddress> findServerAddressList();
-
- /**
- * 选取一个合适的address,可以随机获取等‘
- * 内部可以使用合适的算法.
- * @return
- */
- InetSocketAddress selector();
-
- void close();
- }
基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java
- package cn.slimsmart.thrift.rpc.zookeeper;
-
- import java.net.InetSocketAddress;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Queue;
- import java.util.Set;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.imps.CuratorFrameworkState;
- import org.apache.curator.framework.recipes.cache.ChildData;
- import org.apache.curator.framework.recipes.cache.PathChildrenCache;
- import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.InitializingBean;
-
- /**
- * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
- */
- public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {
-
- private Logger logger = LoggerFactory.getLogger(getClass());
-
- // 注册服务
- private String service;
- // 服务版本号
- private String version = "1.0.0";
-
- private PathChildrenCache cachedPath;
-
- private CuratorFramework zkClient;
-
- // 用来保存当前provider所接触过的地址记录
- // 当zookeeper集群故障时,可以使用trace中地址,作为"备份"
- private Set<String> trace = new HashSet<String>();
-
- private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
-
- private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
-
- private Object lock = new Object();
-
- // 默认权重
- private static final Integer DEFAULT_WEIGHT = 1;
-
- public void setService(String service) {
- this.service = service;
- }
-
- public void setVersion(String version) {
- this.version = version;
- }
-
- public ThriftServerAddressProviderZookeeper() {
- }
-
- public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
-
- public void setZkClient(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- // 如果zk尚未启动,则启动
- if (zkClient.getState() == CuratorFrameworkState.LATENT) {
- zkClient.start();
- }
- buildPathChildrenCache(zkClient, getServicePath(), true);
- cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
- }
-
- private String getServicePath(){
- return "/" + service + "/" + version;
- }
- private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
- cachedPath = new PathChildrenCache(client, path, cacheData);
- cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- PathChildrenCacheEvent.Type eventType = event.getType();
- switch (eventType) {
- case CONNECTION_RECONNECTED:
- logger.info("Connection is reconection.");
- break;
- case CONNECTION_SUSPENDED:
- logger.info("Connection is suspended.");
- break;
- case CONNECTION_LOST:
- logger.warn("Connection error,waiting...");
- return;
- default:
- //
- }
- // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
- cachedPath.rebuild();
- rebuild();
- }
-
- protected void rebuild() throws Exception {
- List<ChildData> children = cachedPath.getCurrentData();
- if (children == null || children.isEmpty()) {
- // 有可能所有的thrift server都与zookeeper断开了链接
- // 但是,有可能,thrift client与thrift server之间的网络是良好的
- // 因此此处是否需要清空container,是需要多方面考虑的.
- container.clear();
- logger.error("thrift server-cluster error....");
- return;
- }
- List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
- String path = null;
- for (ChildData data : children) {
- path = data.getPath();
- logger.debug("get path:"+path);
- path = path.substring(getServicePath().length()+1);
- logger.debug("get serviceAddress:"+path);
- String address = new String(path.getBytes(), "utf-8");
- current.addAll(transfer(address));
- trace.add(address);
- }
- Collections.shuffle(current);
- synchronized (lock) {
- container.clear();
- container.addAll(current);
- inner.clear();
- inner.addAll(current);
-
- }
- }
- });
- }
-
- private List<InetSocketAddress> transfer(String address) {
- String[] hostname = address.split(":");
- Integer weight = DEFAULT_WEIGHT;
- if (hostname.length == 3) {
- weight = Integer.valueOf(hostname[2]);
- }
- String ip = hostname[0];
- Integer port = Integer.valueOf(hostname[1]);
- List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
- // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载
- for (int i = 0; i < weight; i++) {
- result.add(new InetSocketAddress(ip, port));
- }
- return result;
- }
-
- @Override
- public List<InetSocketAddress> findServerAddressList() {
- return Collections.unmodifiableList(container);
- }
-
- @Override
- public synchronized InetSocketAddress selector() {
- if (inner.isEmpty()) {
- if (!container.isEmpty()) {
- inner.addAll(container);
- } else if (!trace.isEmpty()) {
- synchronized (lock) {
- for (String hostname : trace) {
- container.addAll(transfer(hostname));
- }
- Collections.shuffle(container);
- inner.addAll(container);
- }
- }
- }
- return inner.poll();
- }
-
- @Override
- public void close() {
- try {
- cachedPath.close();
- zkClient.close();
- } catch (Exception e) {
- }
- }
-
- @Override
- public String getService() {
- return service;
- }
-
- }
对此接口还做了一种实现,通过配置获取服务地址,参考附件:FixedAddressProvider.java
三、服务端服务注册实现
ThriftServiceServerFactory.java
四、客户端获取服务代理及连接池实现
客户端连接池实现:ThriftClientPoolFactory.java
- package cn.slimsmart.thrift.rpc;
-
- import java.net.InetSocketAddress;
-
- import org.apache.commons.pool.BasePoolableObjectFactory;
- import org.apache.thrift.TServiceClient;
- import org.apache.thrift.TServiceClientFactory;
- import org.apache.thrift.protocol.TBinaryProtocol;
- import org.apache.thrift.protocol.TProtocol;
- import org.apache.thrift.transport.TFramedTransport;
- import org.apache.thrift.transport.TSocket;
- import org.apache.thrift.transport.TTransport;
-
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
-
- /**
- * 连接池,thrift-client for spring
- */
- public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {
-
- private final ThriftServerAddressProvider serverAddressProvider;
- private final TServiceClientFactory<TServiceClient> clientFactory;
- private PoolOperationCallBack callback;
-
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
- this.serverAddressProvider = addressProvider;
- this.clientFactory = clientFactory;
- }
-
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
- PoolOperationCallBack callback) throws Exception {
- this.serverAddressProvider = addressProvider;
- this.clientFactory = clientFactory;
- this.callback = callback;
- }
-
- static interface PoolOperationCallBack {
- // 销毁client之前执行
- void destroy(TServiceClient client);
-
- // 创建成功是执行
- void make(TServiceClient client);
- }
-
- public void destroyObject(TServiceClient client) throws Exception {
- if (callback != null) {
- try {
- callback.destroy(client);
- } catch (Exception e) {
- //
- }
- }
- TTransport pin = client.getInputProtocol().getTransport();
- pin.close();
- }
-
- public boolean validateObject(TServiceClient client) {
- TTransport pin = client.getInputProtocol().getTransport();
- return pin.isOpen();
- }
-
- @Override
- public TServiceClient makeObject() throws Exception {
- InetSocketAddress address = serverAddressProvider.selector();
- TSocket tsocket = new TSocket(address.getHostName(), address.getPort());
- TTransport transport = new TFramedTransport(tsocket);
- TProtocol protocol = new TBinaryProtocol(transport);
- TServiceClient client = this.clientFactory.getClient(protocol);
- transport.open();
- if (callback != null) {
- try {
- callback.make(client);
- } catch (Exception e) {
- //
- }
- }
- return client;
- }
-
- }
客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java
- package cn.slimsmart.thrift.rpc;
-
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import java.lang.reflect.Proxy;
-
- import org.apache.commons.pool.impl.GenericObjectPool;
- import org.apache.thrift.TServiceClient;
- import org.apache.thrift.TServiceClientFactory;
- import org.springframework.beans.factory.FactoryBean;
- import org.springframework.beans.factory.InitializingBean;
-
- import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
-
- /**
- * 客户端代理
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {
-
- private Integer maxActive = 32;// 最大活跃连接数
-
- // ms,default 3 min,链接空闲时间
- // -1,关闭空闲检测
- private Integer idleTime = 180000;
- private ThriftServerAddressProvider serverAddressProvider;
-
- private Object proxyClient;
- private Class<?> objectClass;
-
- private GenericObjectPool<TServiceClient> pool;
-
- private PoolOperationCallBack callback = new PoolOperationCallBack() {
- @Override
- public void make(TServiceClient client) {
- System.out.println("create");
- }
-
- @Override
- public void destroy(TServiceClient client) {
- System.out.println("destroy");
- }
- };
-
- public void setMaxActive(Integer maxActive) {
- this.maxActive = maxActive;
- }
-
- public void setIdleTime(Integer idleTime) {
- this.idleTime = idleTime;
- }
-
- public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
- this.serverAddressProvider = serverAddressProvider;
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- // 加载Iface接口
- objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
- // 加载Client.Factory类
- Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
- TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
- ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = maxActive;
- poolConfig.minIdle = 0;
- poolConfig.minEvictableIdleTimeMillis = idleTime;
- poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;
- pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
- proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- //
- TServiceClient client = pool.borrowObject();
- try {
- return method.invoke(client, args);
- } catch (Exception e) {
- throw e;
- } finally {
- pool.returnObject(client);
- }
- }
- });
- }
-
- @Override
- public Object getObject() throws Exception {
- return proxyClient;
- }
-
- @Override
- public Class<?> getObjectType() {
- return objectClass;
- }
-
- @Override
- public boolean isSingleton() {
- return true;
- }
-
- public void close() {
- if (serverAddressProvider != null) {
- serverAddressProvider.close();
- }
- }
- }
下面我们看一下服务端和客户端的配置;
服务端spring-context-thrift-server.xml
客户端:spring-context-thrift-client.xml
运行服务端后,我们可以看见zookeeper注册了多个服务地址。
问题
参考资料
Thrift全面介绍
标签:
原文地址:http://www.cnblogs.com/scott19820130/p/4940274.html