标签:中间 minutes hashmap request queue 访问 之间 功能 alibaba
在今天,基于SOA的架构已经大行其道。伴随着架构的SOA化,相关联的服务熔断、降级、限流等思想,也在各种技术讲座中频繁出现。
伴随着业务复杂性的提高,系统的不断拆分,一个面向用户端的API,其内部的RPC调用层层嵌套,调用链条可能会非常长。这会造成以下问题:
为了解决上述问题,服务熔断的思想被提出来。类似现实世界中的“保险丝“,当某个异常条件被触发,直接熔断整个服务,而不是一直等到此服务超时。 熔断的触发条件可以依据不同的场景有所不同,比如统计一个时间窗口内失败的调用次数。
有了熔断,就得有降级。所谓降级,就是当某个服务熔断之后,服务器将不再被调用,此时客户端可以自己准备一个本地的fallback回调,返回一个缺省值。 这样做,虽然服务水平下降,但好歹可用,比直接挂掉要强,当然这也要看适合的业务场景。关于Hystrix中fallback的使用,见官网
使用线程隔离或信号隔离的目的是为不同的服务分配一定的资源,当自己的资源用完,直接返回失败而不是占用别人的资源。
?
?
缺点: 线程池隔离的主要缺点是它们增加计算开销(CPU)。每个命令的执行涉及到排队、调度和上下文切换都是在一个单独的线程上运行的。
public class OrderHystrixCommand extends HystrixCommand<JSONObject> {
@Autowired
private MemberService memberService;
/**
* @param group
*/
public OrderHystrixCommand(MemberService memberService) {
super(setter());
this.memberService = memberService;
}
protected JSONObject run() throws Exception {
JSONObject member = memberService.getMember();
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
return member;
}
private static Setter setter() {
// 服务分组
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("orders");
// 服务标识
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("order");
// 线程池名称
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("order-pool");
// #####################################################
// 线程池配置 线程池大小为10,线程存活时间15秒 队列等待的阈值为100,超过100执行拒绝策略
HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(10)
.withKeepAliveTimeMinutes(15).withQueueSizeRejectionThreshold(100);
// ########################################################
// 命令属性配置Hystrix 开启超时
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
// 采用线程池方式实现服务隔离
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
// 禁止
.withExecutionTimeoutEnabled(false);
return HystrixCommand.Setter.withGroupKey(groupKey).andCommandKey(commandKey).andThreadPoolKey(threadPoolKey)
.andThreadPoolPropertiesDefaults(threadPoolProperties).andCommandPropertiesDefaults(commandProperties);
}
//############ 服务降级 ##########
@Override
protected JSONObject getFallback() {
// 如果Hystrix发生熔断,当前服务不可用,直接执行Fallback方法
System.out.println("系统错误!");
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", 500);
jsonObject.put("msg", "系统错误!");
return jsonObject;
}
}
应用场景:
代码如下:
public class OrderHystrixCommand2 extends HystrixCommand<JSONObject> {
@Autowired
private MemberService memberService;
/**
* @param group
*/
public OrderHystrixCommand2(MemberService memberService) {
super(setter());
this.memberService = memberService;
}
protected JSONObject run() throws Exception {
// Thread.sleep(500);
// System.out.println("orderIndex线程名称" +
// Thread.currentThread().getName());
// System.out.println("success");
JSONObject member = memberService.getMember();
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
return member;
}
private static Setter setter() {
// 服务分组
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("order");
// 命令属性配置 采用信号量模式
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
// 使用一个原子计数器(或信号量)来记录当前有多少个线程在运行,当请求进来时先判断计数
// 器的数值,若超过设置的最大线程个数则拒绝该请求,若不超过则通行,这时候计数器+1,请求返 回成功后计数器-1。
.withExecutionIsolationSemaphoreMaxConcurrentRequests(50);
return HystrixCommand.Setter.withGroupKey(groupKey).andCommandPropertiesDefaults(commandProperties);
}
//############ 服务降级 ##########
@Override
protected JSONObject getFallback() {
// 如果Hystrix发生熔断,当前服务不可用,直接执行Fallback方法
System.out.println("系统错误!");
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", 500);
jsonObject.put("msg", "系统错误!");
return jsonObject;
}
}
应用场景:
需求:搭建一套分布式rpc远程通讯案例:比如订单服务调用会员服务实现服务隔离,防止雪崩效应案例
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>1.5.12</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>1.5.12</version>
</dependency>
</dependencies>
@Service
public class MemberService {
public JSONObject getMember() {
JSONObject result = HttpClientUtils.httpGet("http://127.0.0.1:8081/member/memberIndex");
return result;
}
}
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private MemberService memberService;
@RequestMapping("/orderIndex")
public Object orderIndex() throws InterruptedException {
JSONObject member = memberService.getMember();
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
return member;
}
@RequestMapping("/orderIndexHystrix")
public Object orderIndexHystrix() throws InterruptedException {
return new OrderHystrixCommand(memberService).execute();
}
@RequestMapping("/orderIndexHystrix2")
public Object orderIndexHystrix2() throws InterruptedException {
return new OrderHystrixCommand2(memberService).execute();
}
@RequestMapping("/findOrderIndex")
public Object findIndex() {
System.out.println("当前线程:" + Thread.currentThread().getName() + ",findOrderIndex");
return "findOrderIndex";
}
}
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private MemberService memberService;
@RequestMapping("/orderIndex")
public Object orderIndex() throws InterruptedException {
JSONObject member = memberService.getMember();
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",订单服务调用会员服务:member:" + member);
return member;
}
@RequestMapping("/orderIndexHystrix")
public Object orderIndexHystrix() throws InterruptedException {
return new OrderHystrixCommand(memberService).execute();
}
@RequestMapping("/orderIndexHystrix2")
public Object orderIndexHystrix2() throws InterruptedException {
return new OrderHystrixCommand2(memberService).execute();
}
@RequestMapping("/findOrderIndex")
public Object findIndex() {
System.out.println("当前线程:" + Thread.currentThread().getName() + ",findOrderIndex");
return "findOrderIndex";
}
}
public class HttpClientUtils {
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日志记录
private static RequestConfig requestConfig = null;
static {
// 设置请求和传输超时时间
requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build();
}
/**
* post请求传输json参数
*
* @param url
* url地址
* @param json
* 参数
* @return
*/
public static JSONObject httpPost(String url, JSONObject jsonParam) {
// post请求返回结果
CloseableHttpClient httpClient = HttpClients.createDefault();
JSONObject jsonResult = null;
HttpPost httpPost = new HttpPost(url);
// 设置请求和传输超时时间
httpPost.setConfig(requestConfig);
try {
if (null != jsonParam) {
// 解决中文乱码问题
StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8");
entity.setContentEncoding("UTF-8");
entity.setContentType("application/json");
httpPost.setEntity(entity);
}
CloseableHttpResponse result = httpClient.execute(httpPost);
// 请求发送成功,并得到响应
if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String str = "";
try {
// 读取服务器返回过来的json字符串数据
str = EntityUtils.toString(result.getEntity(), "utf-8");
// 把json字符串转换成json对象
jsonResult = JSONObject.parseObject(str);
} catch (Exception e) {
logger.error("post请求提交失败:" + url, e);
}
}
} catch (IOException e) {
logger.error("post请求提交失败:" + url, e);
} finally {
httpPost.releaseConnection();
}
return jsonResult;
}
/**
* post请求传输String参数 例如:name=Jack&sex=1&type=2
* Content-type:application/x-www-form-urlencoded
*
* @param url
* url地址
* @param strParam
* 参数
* @return
*/
public static JSONObject httpPost(String url, String strParam) {
// post请求返回结果
CloseableHttpClient httpClient = HttpClients.createDefault();
JSONObject jsonResult = null;
HttpPost httpPost = new HttpPost(url);
httpPost.setConfig(requestConfig);
try {
if (null != strParam) {
// 解决中文乱码问题
StringEntity entity = new StringEntity(strParam, "utf-8");
entity.setContentEncoding("UTF-8");
entity.setContentType("application/x-www-form-urlencoded");
httpPost.setEntity(entity);
}
CloseableHttpResponse result = httpClient.execute(httpPost);
// 请求发送成功,并得到响应
if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String str = "";
try {
// 读取服务器返回过来的json字符串数据
str = EntityUtils.toString(result.getEntity(), "utf-8");
// 把json字符串转换成json对象
jsonResult = JSONObject.parseObject(str);
} catch (Exception e) {
logger.error("post请求提交失败:" + url, e);
}
}
} catch (IOException e) {
logger.error("post请求提交失败:" + url, e);
} finally {
httpPost.releaseConnection();
}
return jsonResult;
}
/**
* 发送get请求
*
* @param url
* 路径
* @return
*/
public static JSONObject httpGet(String url) {
// get请求返回结果
JSONObject jsonResult = null;
CloseableHttpClient client = HttpClients.createDefault();
// 发送get请求
HttpGet request = new HttpGet(url);
request.setConfig(requestConfig);
try {
CloseableHttpResponse response = client.execute(request);
// 请求发送成功,并得到响应
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
// 读取服务器返回过来的json字符串数据
HttpEntity entity = response.getEntity();
String strResult = EntityUtils.toString(entity, "utf-8");
// 把json字符串转换成json对象
jsonResult = JSONObject.parseObject(strResult);
} else {
logger.error("get请求提交失败:" + url);
}
} catch (IOException e) {
logger.error("get请求提交失败:" + url, e);
} finally {
request.releaseConnection();
}
return jsonResult;
}
}
@RestController
@RequestMapping("/member")
public class MemberController {
@RequestMapping("/memberIndex")
public Object memberIndex() throws InterruptedException {
Map<String, Object> hashMap = new HashMap<String, Object>();
hashMap.put("code", 200);
hashMap.put("msg", "memberIndex");
Thread.sleep(1500);
return hashMap;
}
}
标签:中间 minutes hashmap request queue 访问 之间 功能 alibaba
原文地址:https://www.cnblogs.com/haoworld/p/hystrix-shi-xian-fu-wu-ge-li-he-jiang-ji.html