标签:
一、集群容错
技术选型:hystrix。(就是上图中熔断器)
熔断的作用:
第一个作用:
假设有两台服务器server1(假设可以处理的请求阈值是1W请求)和server2,在server1上注册了三个服务service1、service2、service3,在server2上注册了一个服务service4,假设service4服务响应缓慢,service1调用service4时,一直在等待响应,那么在高并发下,很快的server1处很快就会达到请求阈值(server1很快就会耗尽处理线程)之后可能宕机,这时候,不只是service1不再可用,server1上的service2和service3也不可用了。
如果我们引入了hystrix,那么service1调用service4的时候,当发现service4超时,立即断掉不再执行,执行getFallback逻辑。这样的话,server1就不会耗尽处理线程,server1上的其他服务也是可用的。当然,这是在合理的配置了超时时间的情况下,如果超时时间设置的太长的话,还是会出现未引入hystrix之前的情况。
第二个作用:
当被调服务经常失败,比如说在10min(可配)中之内调用了20次,失败了15次(可配),那么我们认为这个服务是失败的,先关闭该服务,等一会儿后再自动重新启动该服务!(这是真正的熔断!)
二、实现
1、framework
1.1、pom.xml
1 <!-- converter-jackson --> 2 <dependency> 3 <groupId>com.squareup.retrofit</groupId> 4 <artifactId>converter-jackson</artifactId> 5 <version>1.9.0</version> 6 </dependency> 7 <!-- async-http-client --> 8 <dependency> 9 <groupId>com.ning</groupId> 10 <artifactId>async-http-client</artifactId> 11 <version>1.9.31</version> 12 </dependency> 13 14 <!-- hystrix --> 15 <dependency> 16 <groupId>com.netflix.hystrix</groupId> 17 <artifactId>hystrix-core</artifactId> 18 <version>1.5.3</version> 19 </dependency> 20 <dependency> 21 <groupId>com.netflix.hystrix</groupId> 22 <artifactId>hystrix-metrics-event-stream</artifactId> 23 <version>1.5.3</version> 24 </dependency>
说明:
1.2、服务通信(retrofit)+集群容错(hystrix)
1.2.1、RestAdapterConfig
1 package com.microservice.retrofit; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Component; 5 6 import com.microservice.loadBalancer.MyLoadBalancer; 7 import com.microservice.loadBalancer.ServerAddress; 8 9 import retrofit.RestAdapter; 10 import retrofit.converter.JacksonConverter; 11 12 @Component 13 public class RestAdapterConfig { 14 15 @Autowired 16 private MyLoadBalancer myLoadBalancer; 17 18 /** 19 * 负载均衡并且创建传入的API接口实例 20 */ 21 public <T> T create(Class<T> tclass, String serviceName) { 22 String commandGroupKey = tclass.getSimpleName();// 获得简单类名作为groupKey 23 24 ServerAddress server = myLoadBalancer.chooseServer(serviceName);// 负载均衡 25 RestAdapter restAdapter = new RestAdapter.Builder() 26 .setConverter(new JacksonConverter()) 27 .setErrorHandler(new MyErrorHandler()) 28 .setClient(new MyHttpClient(server, commandGroupKey)) 29 .setEndpoint("/").build(); 30 T tclassInstance = restAdapter.create(tclass); 31 return tclassInstance; 32 } 33 }
说明:这里我们定义了自己的retrofit.Client和自己的retrofit.ErrorHandler
1.2.2、MyHttpClient(自定义retrofit的Client)
1 package com.microservice.retrofit; 2 3 import java.io.IOException; 4 5 import com.microservice.hystrix.HttpHystrixCommand; 6 import com.microservice.loadBalancer.ServerAddress; 7 import com.netflix.hystrix.HystrixCommand.Setter; 8 import com.netflix.hystrix.HystrixCommandGroupKey; 9 10 import retrofit.client.Client; 11 import retrofit.client.Request; 12 import retrofit.client.Response; 13 14 public class MyHttpClient implements Client { 15 private ServerAddress server; 16 private String commandGroupKey; 17 18 public MyHttpClient(ServerAddress server, String commandGroupKey) { 19 this.server = server; 20 this.commandGroupKey = commandGroupKey; 21 } 22 23 @Override 24 public Response execute(Request request) throws IOException { 25 Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandGroupKey)); 26 return new HttpHystrixCommand(setter, server, request).execute();// 同步执行 27 } 28 }
说明:在execute()中引入了hystrix
1.2.3、HttpHystrixCommand(hystrix核心类)
1 package com.microservice.hystrix; 2 3 import java.io.ByteArrayOutputStream; 4 import java.io.IOException; 5 import java.util.ArrayList; 6 import java.util.List; 7 import java.util.concurrent.Future; 8 9 import org.apache.commons.lang3.StringUtils; 10 import org.apache.tomcat.util.http.fileupload.IOUtils; 11 import org.slf4j.Logger; 12 import org.slf4j.LoggerFactory; 13 14 import com.microservice.loadBalancer.ServerAddress; 15 import com.netflix.hystrix.HystrixCommand; 16 import com.ning.http.client.AsyncHttpClient; 17 import com.ning.http.client.FluentCaseInsensitiveStringsMap; 18 import com.ning.http.client.RequestBuilder; 19 20 import retrofit.client.Header; 21 import retrofit.client.Request; 22 import retrofit.client.Response; 23 import retrofit.mime.TypedByteArray; 24 import retrofit.mime.TypedOutput; 25 26 public class HttpHystrixCommand extends HystrixCommand<Response> { 27 private static final Logger LOGGER = LoggerFactory.getLogger(HttpHystrixCommand.class); 28 29 private ServerAddress server; 30 private Request request; 31 private String requestUrl; 32 private AsyncHttpClient asyncHttpClient; 33 34 public HttpHystrixCommand(Setter setter, ServerAddress server, Request request) { 35 super(setter); 36 this.server = server; 37 this.request = request; 38 39 // AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder().setRequestTimeout(5000);//5s 40 // this.asyncHttpClient = new AsyncHttpClient(builder.build()); 41 this.asyncHttpClient = new AsyncHttpClient(); 42 } 43 44 @Override 45 public Response run() throws Exception { 46 com.ning.http.client.Request asyncReq = retroReq2asyncReq(request, server); 47 Future<com.ning.http.client.Response> asyncResFuture = asyncHttpClient.executeRequest(asyncReq); 48 com.ning.http.client.Response asyncRes = asyncResFuture.get(); 49 return asynRes2RetroRes(asyncRes); 50 } 51 52 /** 53 * 1、设置方法请求类型,例如:GET/POST 54 * 2、转换请求头header(包括mime。这个需要根据请求体的情况进行掌握) 55 * 3、转换请求体 56 * 4、设置请求URL 57 */ 58 public com.ning.http.client.Request retroReq2asyncReq(Request request, ServerAddress server) { 59 RequestBuilder requestBuilder = new RequestBuilder(request.getMethod());//传入方法请求类型,例如:GET/POST 60 List<Header> headers = request.getHeaders(); 61 headers.forEach(x -> requestBuilder.addHeader(x.getName(), x.getValue())); 62 63 if (request.getBody() != null) { 64 String mimeType = StringUtils.EMPTY; 65 if (StringUtils.isNotEmpty(mimeType)) { 66 requestBuilder.addHeader("Content-Type", mimeType); 67 } else { 68 requestBuilder.addHeader("Content-Type", "application/json"); 69 } 70 71 TypedOutput body = request.getBody(); 72 ByteArrayOutputStream outPutStream = new ByteArrayOutputStream(); 73 try { 74 body.writeTo(outPutStream);//将body内容写入到ByteArrayOutputStream里 75 requestBuilder.setBody(outPutStream.toByteArray()); 76 } catch (IOException e) { 77 e.printStackTrace(); 78 } finally { 79 IOUtils.closeQuietly(outPutStream); 80 } 81 } 82 String url = new StringBuilder("http://").append(server.getIp()) 83 .append(":") 84 .append(server.getPort()) 85 .append("/") 86 .append(request.getUrl()).toString(); 87 requestUrl = url; 88 requestBuilder.setUrl(url); 89 return requestBuilder.build(); 90 } 91 92 public Response asynRes2RetroRes(com.ning.http.client.Response asyncRes) throws IOException { 93 return new Response(asyncRes.getUri().toUrl(), 94 asyncRes.getStatusCode(), 95 asyncRes.getStatusText(), 96 getHeaders(asyncRes.getHeaders()), 97 new TypedByteArray(asyncRes.getContentType(), asyncRes.getResponseBodyAsBytes())); 98 } 99 100 private List<Header> getHeaders(FluentCaseInsensitiveStringsMap asyncHeaders) { 101 List<Header> retrofitHeaders = new ArrayList<>(); 102 asyncHeaders.keySet().forEach(key -> retrofitHeaders.add(new Header(key, asyncHeaders.getFirstValue(key)))); 103 return retrofitHeaders; 104 } 105 106 /** 107 * 超时后的一些操作,或者如果缓存中有信息,可以从缓存中拿一些,具体的要看业务,也可以打一些logger 108 */ 109 @Override 110 public Response getFallback() { 111 LOGGER.error("请求超时了!requestUrl:‘{}‘", requestUrl); 112 /** 113 * 想要让自定义的ErrorHandler起作用以及下边的404和reason有意义,就一定要配置requestUrl和List<header> 114 * 其实这里可以看做是定义自定义异常的状态码和状态描述 115 * 其中状态码用于自定义异常中的判断(见HystrixRuntimeException) 116 */ 117 return new Response(requestUrl, 404, //定义状态码 118 "execute getFallback because execution timeout", //定义消息 119 new ArrayList<Header>(), null); 120 } 121 }
说明:首先调用run(),run()失败或超时候调用getFallback()
1.2.4、MyErrorHandler(自定义retrofit的错误处理器)
1 package com.microservice.retrofit; 2 3 import com.microservice.exception.HystrixRuntimeException; 4 5 import retrofit.ErrorHandler; 6 import retrofit.RetrofitError; 7 import retrofit.client.Response; 8 9 public class MyErrorHandler implements ErrorHandler{ 10 @Override 11 public Throwable handleError(RetrofitError cause) { 12 Response response = cause.getResponse(); 13 /** 14 * 这里是一个可以定制的地方,自己可以定义所有想要捕获的异常 15 */ 16 if(response!=null && response.getStatus()==404){ 17 return new HystrixRuntimeException(cause); 18 } 19 return cause; 20 } 21 }
说明:当发生了retrofit.error时(不只是上边的getFallback()返回的Response),我们可以在该ErrorHandler的handleError方法来进行相应Response的处理。这里我们指定当404时返回一个自定义异常。
1.2.5、HystrixRuntimeException(自定义异常)
1 package com.microservice.exception; 2 3 /** 4 * 自定义异常 5 */ 6 public class HystrixRuntimeException extends RuntimeException { 7 private static final long serialVersionUID = 8252124808929848902L; 8 9 public HystrixRuntimeException(Throwable cause) { 10 super(cause);//只有这样,才能将异常信息抛给客户端 11 } 12 }
说明:自定义异常只能通过super()来向客户端抛出自己指定的异常信息(上边的Response的reason,但是抛到客户端时还是一个500错误,因为run()错误或超时就是一个服务端错误)。
整个流程:
当myserviceB调用myserviceA的一个方法时,首先会执行自定义的MyHttpClient的execute()方法,在该execute()方法中我们执行了自定义的HttpHystrixCommand的execute()方法,此时就会执行执行HttpHystrixCommand的run()方法,如果该方法运行正常并在超时时间内返回数据,则调用结束。
如果run()方法调用失败或该方法超时,就会直接运行HttpHystrixCommand的getFallback()方法。该方法返回一个retrofit.Response对象,该对象的status是404,错误信息也是自定义的。之后该对象会被包装到RetrofitError对象中,之后RetrofitError对象会由MyErrorHandler的handleError()进行处理:从RetrofitError对象中先取出Response,之后根据该Response的status执行相应的操作,我们这里对404的情况定义了一个自定义异常HystrixRuntimeException。
注意点:
三、配置与测试
1、配置
在consul上配置service/myserviceA/dev/config的配置内容和service/myserviceB/dev/config的内容。其中,myserviceB配置了hystrix的超时时间:
1 hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=1000
说明:关于hystrix的配置参数,查看http://www.cnblogs.com/java-zhao/p/5524584.html
2、测试
最后,启动consul,启动服务A和B,swagger测试就好了!!!(在测试过程中,可以动态的去改变consul中hystrix的超时时间值,来测试archaius的动态读取)
第五章 服务熔断(hystrix)+ retrofit底层通信(AsyncHttpclient)
标签:
原文地址:http://www.cnblogs.com/java-zhao/p/5734835.html