标签:适配 doc false get toc lock 应用 task 对象
在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName
),每次资源调用都会创建一个 Entry
对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU
API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
NodeSelectorSlot
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;ClusterBuilderSlot
则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;LogSlot
则用于打印异常日志;StatisticSlot
则用于记录、统计不同纬度的 runtime 指标监控信息;SystemSlot
则通过系统的状态,例如 load1 等,来控制总的入口流量;AuthoritySlot
则根据配置的黑白名单和调用来源信息,来做黑白名单控制FlowSlot
则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制DegradeSlot
则通过统计信息以及预设的规则,来做熔断降级;Sentinel 将 ProcessorSlot
作为 SPI 接口进行扩展(1.7.2 版本以前 SlotChainBuilder
作为 SPI),使得 Slot Chain 具备了扩展的能力。我们可以自行加入自定义的 slot 并编排 slot 间的顺序,从而可以给 Sentinel 添加自定义的功能。比如我不希望sentinel具有限流功能,可以引入自定义的ChainBuilder代码如下:
public class SentinelChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new DegradeSlot()); return chain; } }
然后在MATE-INF中引入加入下图文件即可
Sentinel 的核心骨架,将不同的 Slot 按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和判断部分(rule checking)。核心结构如下图:
resource是sentinel中最重要的一个概念,sentinel通过资源来保护具体的业务代码或其他后方服务。我们再控制台上实际配置的都是一个个资源。也可以使用@SentinelResource注解来自定义资源。
Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry
。Context 维持着入口节点(entranceNode
)、本次调用链路的 curNode、调用来源(origin
)等信息。Context 名称即为调用链路入口名称。
每一次资源调用都会创建一个 Entry
。Entry
包含了资源名、curNode(当前统计节点)、originNode(来源统计节点)等信息。
关于sentinel限流的一些基本常识,可参见官方文档,此处不多做赘述,附上链接如下:https://github.com/alibaba/Sentinel/wiki/%E6%B5%81%E9%87%8F%E6%8E%A7%E5%88%B6
对于sentinel中的http请求限流,我们再使用过程中,引入依赖包后,不需要新增任何代码,只需要在sentinel的配置面板中新增限流规则即可。其具体实现,在于sentinel的sentinel-web-serverlet包中,CommonFilter类实现类javax.servlet.Filter接口,在请求处理之前做了一次拦截。具体代码如下:
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HttpServletRequest sRequest = (HttpServletRequest)request; Entry urlEntry = null; try { String target = FilterUtil.filterTarget(sRequest); // Clean and unify the URL. // For REST APIs, you have to clean the URL (e.g. `/foo/1` and `/foo/2` -> `/foo/:id`), or // the amount of context and resources will exceed the threshold. UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner(); if (urlCleaner != null) { target = urlCleaner.clean(target); } // If you intend to exclude some URLs, you can convert the URLs to the empty string "" // in the UrlCleaner implementation. if (!StringUtil.isEmpty(target)) { // Parse the request origin using registered origin parser. String origin = parseOrigin(sRequest); ContextUtil.enter(WebServletConfig.WEB_SERVLET_CONTEXT_NAME, origin); if (httpMethodSpecify) { // Add HTTP method prefix if necessary. String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + COLON + target; urlEntry = SphU.entry(pathWithHttpMethod, ResourceTypeConstants.COMMON_WEB, EntryType.IN); } else { // 进入sentinel流控的核心方法 urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN); } } chain.doFilter(request, response); } catch (BlockException e) { HttpServletResponse sResponse = (HttpServletResponse)response; // Return the block page, or redirect to another URL. WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e); } catch (IOException | ServletException | RuntimeException e2) { Tracer.traceEntry(e2, urlEntry); throw e2; } finally { if (urlEntry != null) { urlEntry.exit(); } ContextUtil.exit(); } }
此处把请求路径作为资源点下传。其中实际核心代码是在圈出处进入SphU中(sentienel中限流主要通过SphU.entry与SphO.entry作为入口,前者限流抛出BlockException,后者限流返回false)。查看CtSph方法,可见实际sentinel各核心组件串联方法如下:
//这里传入得参数count是1,prioritized=false,args是容量为1的空数组 private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { //获取当前线程的上下文 Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } //为空的话,创建一个默认的context if (context == null) { //1 // Using default context. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); } // Global switch is close, no rule checking will do. if (!Constants.ON) {//这里会返回false return new CtEntry(resourceWrapper, null, context); } //创建一系列功能插槽 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ //如果超过了插槽的最大数量,那么会返回null if (chain == null) { return new CtEntry(resourceWrapper, null, context); } // 获取entry Entry e = new CtEntry(resourceWrapper, chain, context); try { //调用责任链 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }
3.2限流原理分析
上图中已经会调用ProcessorSlot.entry进入一系列功能插槽(slot chain)中,其中限流规则检查是FlowSlot,FlowSlot中调用FlowRuleChecker.checkFlow方法,进行实质的降级检查,代码如下:
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } //返回FlowRuleManager里面注册的所有规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { //如果当前的请求不能通过,那么就抛出FlowException异常 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } // 定义一个Function,此处的ruleProvider即是上文中的ruleProvider private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() { @Override public Collection<FlowRule> apply(String resource) { // Flow rule map should not be null. Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); return flowRules.get(resource); } }; // 真实进行限流校验的方法 public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { //如果没有设置limitapp,那么不进行校验,控制台配置时,默认会给个defualt String limitApp = rule.getLimitApp(); if (limitApp == null) { return true; } //集群模式 if (rule.isClusterMode()) { return passClusterCheck(rule, context, node, acquireCount, prioritized); } //本地模式 return passLocalCheck(rule, context, node, acquireCount, prioritized); }
上述代码中会通过rule.isClusterMode方法判断是分布式限流还是本地限流,下面我们只看本地限流的代码
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { //节点选择 Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } //根据设置的规则来拦截 return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
其中selectNodeByRequesterAndStrategy方法用于选择统计数据的载体用户,rule.getRater用于选择流控的方式(快速失败,预热,令牌桶)。selectNodeByRequesterAndStrategy代码如下:
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) { // 控制台中配置的限流针对的来源 String limitApp = rule.getLimitApp(); // 关系限流策略 int strategy = rule.getStrategy(); // 请求的来源 String origin = context.getOrigin(); //origin不为`default` or `other`,并且limitApp和origin相等 if (limitApp.equals(origin) && filterOrigin(origin)) {//1 if (strategy == RuleConstant.STRATEGY_DIRECT) { // Matches limit origin, return origin statistic node. return context.getOriginNode(); } //关系限流策略为关联或者链路的处理 return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {//2 if (strategy == RuleConstant.STRATEGY_DIRECT) { //这里返回ClusterNode,表示所有应用对该资源的所有请求情况 // Return the cluster node. return node.getClusterNode(); } //关系限流策略为关联或者链路的处理 return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {//3 if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } //关系限流策略为关联或者链路的处理 return selectReferenceNode(rule, context, node); } return null; }
此方法的中的limitApp为限制的资源,strategy为流控的类型(调用方,关联,链路),origin为调用来源。最后我们看一下立即通过时的代码
public boolean canPass(Node node, int acquireCount, boolean prioritized) { //判断是限流还是限制并发数量,然后获取流量或并发数量 int curCount = avgUsedTokens(node); //如果两者相加大于限定的并发数 if (curCount + acquireCount > count) { // 如果此请求是一个高优先级请求,并且限流类型为qps,则不会立即失败,而是去占用未来的窗口 if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }
接下来重点来了,比如rt降级规则,我配置的是1ms超时,而此接口实际rt远高于1ms。但是当我测试接口的时候,发现实际并没有进入降级处理方法中。但是人举国猛点则会进入降级代码,查看sentinel的github上文档后发现sentinel对于rt降级策略如下:
所以需要多次触发才能进入熔断机制
异常数量降级需要注意,此时的时间窗口是分钟机制,官方文档如下:
异常比例熔断也存在最小次数的机制。
异常数熔断由于窗口是分钟,所以集中出现异常时,如果熔断时间不到1min,比如10s,那么熔断结束后,向前推一分钟内异常数可能依旧高于最大数,会再次进入熔断逻辑。
附上sentinel的熔断降级熔断文档如下:
https://github.com/alibaba/Sentinel/wiki/%E7%86%94%E6%96%AD%E9%99%8D%E7%BA%A7
对于熔断,为了更好的配置熔断发生后的接口返回值,我们通常使用的@SentinelResource注解,并指定blockHandler方法处理熔断发生后的回执。对于@SentinelResource注解,查看sentinel源码可发现其切面如下:
@Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try { // 进入sentinel限流方法 entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); Object result = pjp.proceed(); return result; } catch (BlockException ex) { return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex, annotation); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { // exit时进行数据统计 if (entry != null) { entry.exit(1, pjp.getArgs()); } } }
发现其对于@SentinelResource注解切面中,会在进入真实方法调用前,调用SphU.entry(resourceName, resourceType, entryType, pjp.getArgs())方法,根据前文分析可知,此处会调用ProcessorSlot.entry进入一系列功能插槽(slot chain)中,其中降级规则检查是DegradeSlot,DegradeSlot中调用DegradeRuleManager.checkDegrade方法,进行实质的降级检查,代码如下:
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { //根据resource来获取降级策略 Set<DegradeRule> rules = degradeRules.get(resource.getName()); if (rules == null) { return; } for (DegradeRule rule : rules) { if (!rule.passCheck(context, node, count)) { throw new DegradeException(rule.getLimitApp(), rule); } } }
其中degradeRules.get方法通过资源名称,查看此资源对应的降级规则列表。DegradeRule.passCheck方法为实际降级检查。
熔断相关代码如下:
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { //返回false直接进行降级 if (cut.get()) { return false; } //降级是根据资源的全局节点来进行判断降级策略的 ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource()); if (clusterNode == null) { return true; } //根据响应时间降级策略 if (grade == RuleConstant.DEGRADE_GRADE_RT) { //获取节点的平均响应时间 double rt = clusterNode.avgRt(); if (rt < this.count) { passCount.set(0); return true; } //rtSlowRequestAmount默认是5 // Sentinel will degrade the service only if count exceeds. if (passCount.incrementAndGet() < rtSlowRequestAmount) { return true; } // 根据异常比例降级 } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { //获取每秒异常的次数 double exception = clusterNode.exceptionQps(); //获取每秒成功的次数 double success = clusterNode.successQps(); //获取每秒总调用次数 double total = clusterNode.totalQps(); // If total amount is less than minRequestAmount, the request will pass. // 如果总调用次数少于5,那么不进行降级 if (total < minRequestAmount) { return true; } // In the same aligned statistic time window, // "success" (aka. completed count) = exception count + non-exception count (realSuccess) // 获取真实成功数 double realSuccess = success - exception; if (realSuccess <= 0 && exception < minRequestAmount) { return true; } if (exception / success < count) { return true; } // 根据异常数降级 } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) { double exception = clusterNode.totalException(); if (exception < count) { return true; } } //根据设置的时间窗口进行重置 if (cut.compareAndSet(false, true)) { ResetTask resetTask = new ResetTask(this); pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS); } return false; }
在接资源点调用结束后,会进入com.alibaba.csp.sentinel.Entry.exit方法,此方法最后会调用StatisticNode.addRtAndSuccess方法记录成功数和RT。具体如下:
@Override public void addRtAndSuccess(long rt, int successCount) { rollingCounterInSecond.addSuccess(successCount); rollingCounterInSecond.addRT(rt); rollingCounterInMinute.addSuccess(successCount); rollingCounterInMinute.addRT(rt); }
标签:适配 doc false get toc lock 应用 task 对象
原文地址:https://www.cnblogs.com/fbw-gxy/p/13096708.html