码迷,mamicode.com
首页 > Web开发 > 详细

第五章 服务熔断(hystrix)+ retrofit底层通信(AsyncHttpclient)

时间:2016-08-03 23:55:44      阅读:1251      评论:0      收藏:0      [点我收藏+]

标签:

技术分享

 

一、集群容错

技术选型:hystrix。(就是上图中熔断器

熔断的作用

第一个作用:

假设有两台服务器server1(假设可以处理的请求阈值是1W请求)和server2,在server1上注册了三个服务service1、service2、service3,在server2上注册了一个服务service4,假设service4服务响应缓慢,service1调用service4时,一直在等待响应,那么在高并发下,很快的server1处很快就会达到请求阈值(server1很快就会耗尽处理线程)之后可能宕机,这时候,不只是service1不再可用,server1上的service2和service3也不可用了。

如果我们引入了hystrix,那么service1调用service4的时候,当发现service4超时,立即断掉不再执行,执行getFallback逻辑。这样的话,server1就不会耗尽处理线程,server1上的其他服务也是可用的。当然,这是在合理的配置了超时时间的情况下,如果超时时间设置的太长的话,还是会出现未引入hystrix之前的情况。

第二个作用:

当被调服务经常失败,比如说在10min(可配)中之内调用了20次,失败了15次(可配),那么我们认为这个服务是失败的,先关闭该服务,等一会儿后再自动重新启动该服务!(这是真正的熔断!)

 

二、实现

1、framework

技术分享

1.1、pom.xml

技术分享
 1         <!-- converter-jackson -->
 2         <dependency>
 3             <groupId>com.squareup.retrofit</groupId>
 4             <artifactId>converter-jackson</artifactId>
 5             <version>1.9.0</version>
 6         </dependency>
 7         <!-- async-http-client -->
 8         <dependency>
 9             <groupId>com.ning</groupId>
10             <artifactId>async-http-client</artifactId>
11             <version>1.9.31</version>
12         </dependency>
13 
14         <!-- hystrix -->
15         <dependency>
16             <groupId>com.netflix.hystrix</groupId>
17             <artifactId>hystrix-core</artifactId>
18             <version>1.5.3</version>
19         </dependency>
20         <dependency>
21             <groupId>com.netflix.hystrix</groupId>
22             <artifactId>hystrix-metrics-event-stream</artifactId>
23             <version>1.5.3</version>
24         </dependency>
View Code

说明:

  • 添加retrofit的Jackson转换器,默认为GSON
  • 添加AsyncHttpClient
  • 添加hystrix及其metrics包(后者用于展示hystrix的图表信息,以后会在优化部分完成)

1.2、服务通信(retrofit)+集群容错(hystrix)

1.2.1、RestAdapterConfig

技术分享
 1 package com.microservice.retrofit;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Component;
 5 
 6 import com.microservice.loadBalancer.MyLoadBalancer;
 7 import com.microservice.loadBalancer.ServerAddress;
 8 
 9 import retrofit.RestAdapter;
10 import retrofit.converter.JacksonConverter;
11 
12 @Component
13 public class RestAdapterConfig {
14 
15     @Autowired
16     private MyLoadBalancer myLoadBalancer;
17 
18     /**
19      * 负载均衡并且创建传入的API接口实例
20      */
21     public <T> T create(Class<T> tclass, String serviceName) {
22         String commandGroupKey = tclass.getSimpleName();// 获得简单类名作为groupKey
23 
24         ServerAddress server = myLoadBalancer.chooseServer(serviceName);// 负载均衡
25         RestAdapter restAdapter = new RestAdapter.Builder()
26                                   .setConverter(new JacksonConverter())
27                                   .setErrorHandler(new MyErrorHandler())
28                                   .setClient(new MyHttpClient(server, commandGroupKey))
29                                   .setEndpoint("/").build();
30         T tclassInstance = restAdapter.create(tclass);
31         return tclassInstance;
32     }
33 }
View Code

说明:这里我们定义了自己的retrofit.Client和自己的retrofit.ErrorHandler

1.2.2、MyHttpClient(自定义retrofit的Client)

技术分享
 1 package com.microservice.retrofit;
 2 
 3 import java.io.IOException;
 4 
 5 import com.microservice.hystrix.HttpHystrixCommand;
 6 import com.microservice.loadBalancer.ServerAddress;
 7 import com.netflix.hystrix.HystrixCommand.Setter;
 8 import com.netflix.hystrix.HystrixCommandGroupKey;
 9 
10 import retrofit.client.Client;
11 import retrofit.client.Request;
12 import retrofit.client.Response;
13 
14 public class MyHttpClient implements Client {
15     private ServerAddress server;
16     private String        commandGroupKey;
17 
18     public MyHttpClient(ServerAddress server, String commandGroupKey) {
19         this.server = server;
20         this.commandGroupKey = commandGroupKey;
21     }
22 
23     @Override
24     public Response execute(Request request) throws IOException {
25         Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
26         return new HttpHystrixCommand(setter, server, request).execute();// 同步执行
27     }
28 }
View Code

说明:在execute()中引入了hystrix

  • 定义了hystrix的commandGroupKey是服务名(eg.myserviceA,被调用服务名
  • 没有定义commandKey(通常commandKey是服务的一个方法名,例如myserviceA的client的getProvinceByCityName),通常该方法名是被调用服务的client中的被调用方法名
  • 手动设置hystrix的属性
    • setter.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000));
    • 实际上,直接配置在consul上就好了,根据上一节archaius的自动拉取配置,hystrix会自动从pollResult中取配置并设置到实例中去
  • 查看hystrix的属性
    • command.getProperties().executionTimeoutInMilliseconds().get(),这里的command就是下边的HttpHystrixCommand实例

1.2.3、HttpHystrixCommand(hystrix核心类)

技术分享
  1 package com.microservice.hystrix;
  2 
  3 import java.io.ByteArrayOutputStream;
  4 import java.io.IOException;
  5 import java.util.ArrayList;
  6 import java.util.List;
  7 import java.util.concurrent.Future;
  8 
  9 import org.apache.commons.lang3.StringUtils;
 10 import org.apache.tomcat.util.http.fileupload.IOUtils;
 11 import org.slf4j.Logger;
 12 import org.slf4j.LoggerFactory;
 13 
 14 import com.microservice.loadBalancer.ServerAddress;
 15 import com.netflix.hystrix.HystrixCommand;
 16 import com.ning.http.client.AsyncHttpClient;
 17 import com.ning.http.client.FluentCaseInsensitiveStringsMap;
 18 import com.ning.http.client.RequestBuilder;
 19 
 20 import retrofit.client.Header;
 21 import retrofit.client.Request;
 22 import retrofit.client.Response;
 23 import retrofit.mime.TypedByteArray;
 24 import retrofit.mime.TypedOutput;
 25 
 26 public class HttpHystrixCommand extends HystrixCommand<Response> {
 27     private static final Logger LOGGER = LoggerFactory.getLogger(HttpHystrixCommand.class);
 28 
 29     private ServerAddress       server;
 30     private Request             request;
 31     private String              requestUrl;
 32     private AsyncHttpClient     asyncHttpClient;
 33 
 34     public HttpHystrixCommand(Setter setter, ServerAddress server, Request request) {
 35         super(setter);
 36         this.server = server;
 37         this.request = request;
 38 
 39         //        AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder().setRequestTimeout(5000);//5s
 40         //        this.asyncHttpClient = new AsyncHttpClient(builder.build());
 41         this.asyncHttpClient = new AsyncHttpClient();
 42     }
 43 
 44     @Override
 45     public Response run() throws Exception {
 46         com.ning.http.client.Request asyncReq = retroReq2asyncReq(request, server);
 47         Future<com.ning.http.client.Response> asyncResFuture = asyncHttpClient.executeRequest(asyncReq);
 48         com.ning.http.client.Response asyncRes = asyncResFuture.get();
 49         return asynRes2RetroRes(asyncRes);
 50     }
 51 
 52     /**
 53      * 1、设置方法请求类型,例如:GET/POST
 54      * 2、转换请求头header(包括mime。这个需要根据请求体的情况进行掌握)
 55      * 3、转换请求体
 56      * 4、设置请求URL
 57      */
 58     public com.ning.http.client.Request retroReq2asyncReq(Request request, ServerAddress server) {
 59         RequestBuilder requestBuilder = new RequestBuilder(request.getMethod());//传入方法请求类型,例如:GET/POST
 60         List<Header> headers = request.getHeaders();
 61         headers.forEach(x -> requestBuilder.addHeader(x.getName(), x.getValue()));
 62 
 63         if (request.getBody() != null) {
 64             String mimeType = StringUtils.EMPTY;
 65             if (StringUtils.isNotEmpty(mimeType)) {
 66                 requestBuilder.addHeader("Content-Type", mimeType);
 67             } else {
 68                 requestBuilder.addHeader("Content-Type", "application/json");
 69             }
 70 
 71             TypedOutput body = request.getBody();
 72             ByteArrayOutputStream outPutStream = new ByteArrayOutputStream();
 73             try {
 74                 body.writeTo(outPutStream);//将body内容写入到ByteArrayOutputStream里
 75                 requestBuilder.setBody(outPutStream.toByteArray());
 76             } catch (IOException e) {
 77                 e.printStackTrace();
 78             } finally {
 79                 IOUtils.closeQuietly(outPutStream);
 80             }
 81         }
 82         String url = new StringBuilder("http://").append(server.getIp())
 83                                                  .append(":")
 84                                                  .append(server.getPort())
 85                                                  .append("/")
 86                                                  .append(request.getUrl()).toString();
 87         requestUrl = url;
 88         requestBuilder.setUrl(url);
 89         return requestBuilder.build();
 90     }
 91 
 92     public Response asynRes2RetroRes(com.ning.http.client.Response asyncRes) throws IOException {
 93         return new Response(asyncRes.getUri().toUrl(), 
 94                             asyncRes.getStatusCode(), 
 95                             asyncRes.getStatusText(),
 96                             getHeaders(asyncRes.getHeaders()),
 97                             new TypedByteArray(asyncRes.getContentType(), asyncRes.getResponseBodyAsBytes()));
 98     }
 99 
100     private List<Header> getHeaders(FluentCaseInsensitiveStringsMap asyncHeaders) {
101         List<Header> retrofitHeaders = new ArrayList<>();
102         asyncHeaders.keySet().forEach(key -> retrofitHeaders.add(new Header(key, asyncHeaders.getFirstValue(key))));
103         return retrofitHeaders;
104     }
105 
106     /**
107      * 超时后的一些操作,或者如果缓存中有信息,可以从缓存中拿一些,具体的要看业务,也可以打一些logger
108      */
109     @Override
110     public Response getFallback() {
111         LOGGER.error("请求超时了!requestUrl:‘{}‘", requestUrl);
112         /**
113          * 想要让自定义的ErrorHandler起作用以及下边的404和reason有意义,就一定要配置requestUrl和List<header>
114          * 其实这里可以看做是定义自定义异常的状态码和状态描述
115          * 其中状态码用于自定义异常中的判断(见HystrixRuntimeException)
116          */
117         return new Response(requestUrl, 404, //定义状态码
118             "execute getFallback because execution timeout", //定义消息 
119             new ArrayList<Header>(), null);
120     }
121 }
View Code

说明:首先调用run(),run()失败或超时候调用getFallback()

  • run()--这里是一个定制口,我使用了AsyncHttpClient,还可以使用其他的网络调用工具,例如:okhttp
    • 首先将Retrofit的请求信息Request转化为AsyncHttpClient的Request(在这里调用了负载均衡,将请求负载到选出的一台机器)
    • 之后调用AsyncHttpClient来进行真正的http调用,并返回AsyncHttpClient型的相应Response
    • 最后将AsyncHttpClient型的响应Response转换为Retrofit型的Response
  • getFallback()
    • 直接抛异常是不行的(该接口不让),只能采取以下的方式
    • 返回一个Response对象,该对象封装了status是404+错误的原因reason+请求的url+相应的Header列表+响应体(这里的status和reason会被用在ErrorHandler中去用于指定执行不同的逻辑,具体看下边的MyErrorHandler)
    • 如果想让MyErrorHandler起作用,Response对象必须有"请求的url+相应的Header列表",其中Header列表可以使一个空List实现类,但是不可为null
  • 在构建AsyncHttpClient实例时可以设置相关的http参数,例如:注释部分的设置请求超时时间。
    • 值得注意的是我们在配置请求超时时间时,要结合hystrix的超时时间来设置,程序会以二者的最小值作为请求超时时间

1.2.4、MyErrorHandler(自定义retrofit的错误处理器)

技术分享
 1 package com.microservice.retrofit;
 2 
 3 import com.microservice.exception.HystrixRuntimeException;
 4 
 5 import retrofit.ErrorHandler;
 6 import retrofit.RetrofitError;
 7 import retrofit.client.Response;
 8 
 9 public class MyErrorHandler implements ErrorHandler{
10     @Override
11     public Throwable handleError(RetrofitError cause) {
12         Response response = cause.getResponse();
13         /**
14          * 这里是一个可以定制的地方,自己可以定义所有想要捕获的异常
15          */
16         if(response!=null && response.getStatus()==404){
17             return new HystrixRuntimeException(cause);
18         }
19         return cause;
20     }
21 }
View Code

说明:当发生了retrofit.error时(不只是上边的getFallback()返回的Response),我们可以在该ErrorHandler的handleError方法来进行相应Response的处理。这里我们指定当404时返回一个自定义异常。

1.2.5、HystrixRuntimeException(自定义异常)

技术分享
 1 package com.microservice.exception;
 2 
 3 /**
 4  * 自定义异常
 5  */
 6 public class HystrixRuntimeException extends RuntimeException {
 7     private static final long serialVersionUID = 8252124808929848902L;
 8 
 9     public HystrixRuntimeException(Throwable cause) {
10         super(cause);//只有这样,才能将异常信息抛给客户端
11     }
12 }
View Code

说明:自定义异常只能通过super()来向客户端抛出自己指定的异常信息(上边的Response的reason,但是抛到客户端时还是一个500错误,因为run()错误或超时就是一个服务端错误)。

 

整个流程:

当myserviceB调用myserviceA的一个方法时,首先会执行自定义的MyHttpClient的execute()方法,在该execute()方法中我们执行了自定义的HttpHystrixCommand的execute()方法,此时就会执行执行HttpHystrixCommand的run()方法,如果该方法运行正常并在超时时间内返回数据,则调用结束。

如果run()方法调用失败或该方法超时,就会直接运行HttpHystrixCommand的getFallback()方法。该方法返回一个retrofit.Response对象,该对象的status是404,错误信息也是自定义的。之后该对象会被包装到RetrofitError对象中,之后RetrofitError对象会由MyErrorHandler的handleError()进行处理:从RetrofitError对象中先取出Response,之后根据该Response的status执行相应的操作,我们这里对404的情况定义了一个自定义异常HystrixRuntimeException。

 注意点:

  • retrofit的Response最好不要是null
  • retrofit的Jackson转换器无法转化单纯的String(因为Jackson转换器会将一个json串转化为json对象),这一点缺点可以看做没有,因为我们的接口都是restful的,那么我们都是使用json格式来通信的。 

 

三、配置与测试

1、配置

在consul上配置service/myserviceA/dev/config的配置内容和service/myserviceB/dev/config的内容。其中,myserviceB配置了hystrix的超时时间:

技术分享
1 hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=1000
View Code

说明:关于hystrix的配置参数,查看http://www.cnblogs.com/java-zhao/p/5524584.html

2、测试

最后,启动consul,启动服务A和B,swagger测试就好了!!!(在测试过程中,可以动态的去改变consul中hystrix的超时时间值,来测试archaius的动态读取)

第五章 服务熔断(hystrix)+ retrofit底层通信(AsyncHttpclient)

标签:

原文地址:http://www.cnblogs.com/java-zhao/p/5734835.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!