标签:cep 服务器 并且 led this 取出 说明 key info
负载均衡算法模块主要的功能是从负载均衡器中获取服务器列表信息,根据算法选取出一个服务器。
IRule
负载均衡算法接口
public interface IRule{ public Server choose(Object key);//选择一个服务器 public void setLoadBalancer(ILoadBalancer lb);//设置负载均衡器 public ILoadBalancer getLoadBalancer(); //获取负载均衡器 }
通过BaseLoadBalancer的setRule或构造函数来为BaseLoadBalancer添加IRule
public void setRule(IRule rule) { if (rule != null) { this.rule = rule; } else { /* default rule */ this.rule = new RoundRobinRule(); } if (this.rule.getLoadBalancer() != this) { this.rule.setLoadBalancer(this); } }
RandomRule
生成一个随机数,从负载均衡器中选取一个服务器。
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int index = rand.nextInt(serverCount); server = upList.get(index); if (server == null) { Thread.yield(); continue; } if (server.isAlive()) { return (server); } server = null; Thread.yield(); } return server; }
RoundRobinRule
轮询从负载均衡器中选取一个服务器。
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; }
BestAvailableRule
选择并发量最小且没有被熔断的服务器,需要使用到LoadBalancerStats来获取服务器的状态。
public Server choose(Object key) { if (loadBalancerStats == null) { return super.choose(key); } List<Server> serverList = getLoadBalancer().getAllServers(); int minimalConcurrentConnections = Integer.MAX_VALUE; long currentTime = System.currentTimeMillis(); Server chosen = null; for (Server server: serverList) { ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); if (!serverStats.isCircuitBreakerTripped(currentTime)) { int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } if (chosen == null) { return super.choose(key); } else { return chosen; } }
WeightedResponseTimeRule
按照响应时间的比例来选择服务器。首先内部会有一个定时器,定时从负载均衡器里面读取服务器的平均响应时间,然后根据平均响应时间转换成权重。
class DynamicServerWeightTask extends TimerTask { public void run() { ServerWeight serverWeight = new ServerWeight(); try { serverWeight.maintainWeights(); } catch (Exception e) { logger.error("Error running DynamicServerWeightTask for {}", name, e); } } } class ServerWeight { public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { return; } double totalResponseTime = 0; // find maximal 95% response time for (Server server : nlb.getAllServers()) { // this will automatically load the stats if not in cache ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // weight for each server is (sum of responseTime of all servers - responseTime) // so that the longer the response time, the less the weight and the less likely to be chosen Double weightSoFar = 0.0; // create new list and hot swap the reference List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } } }
然后根据权重来选择服务器
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { // get hold of the current reference in case it is changed from the other thread List<Double> currentWeights = accumulatedWeights; if (Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; // last one in the list is the sum of all weights double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // No server has been hit yet and total weight is not initialized // fallback to use round robin if (maxTotalWeight < 0.001d) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive) double randomWeight = random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex int n = 0; for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } } server = allList.get(serverIndex); } if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Next. server = null; } return server; }
AvailabilityFilteringRule
使用RoundRobinRule来选择服务器,并且通过AvailabilityPredicate进行筛选。AvailabilityPredicate会剔除熔断的和超过指定并发量的server。
public Server choose(Object key) { int count = 0; Server server = roundRobinRule.choose(key); while (count++ <= 10) { if (predicate.apply(new PredicateKey(server))) { return server; } server = roundRobinRule.choose(key); } return super.choose(key); }
AvailabilityPredicate:
public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; }
使用AvailabilityFilteringRule涉及配置:
属性 | 实现 | 默认值 |
niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped | 是否剔除熔断server | true |
niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit |
最大连接数 | Integer.MAX_VALUE |
ZoneAvoidanceRule
public boolean apply(@Nullable PredicateKey input) { if (!ENABLED.get()) { return true; } String serverZone = input.getServer().getZone(); if (serverZone == null) { // there is no zone information from the server, we do not want to filter // out this server return true; } LoadBalancerStats lbStats = getLBStats(); if (lbStats == null) { // no stats available, do not filter return true; } if (lbStats.getAvailableZones().size() <= 1) { // only one zone is available, do not filter return true; } Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); if (!zoneSnapshot.keySet().contains(serverZone)) { // The server zone is unknown to the load balancer, do not filter it out return true; } logger.debug("Zone snapshots: {}", zoneSnapshot); Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null) { return availableZones.contains(input.getServer().getZone()); } else { return false; } }
ZoneAvoidancePredicate:
属性 | 说明 | 默认值 |
ZoneAwareNIWSDiscoveryLoadBalancer.triggeringLoadPerServerThreshold |
||
ZoneAwareNIWSDiscoveryLoadBalancer.avoidZoneWithBlackoutPercetage | ||
niws.loadbalancer.zoneAvoidanceRule.enabled |
使用ZoneAvoidanceRule涉及配置:
类图
标签:cep 服务器 并且 led this 取出 说明 key info
原文地址:https://www.cnblogs.com/zhangwanhua/p/7994758.html