流控
请使用 IDEA 工具 结合查看源码
FlowSlot 负责流量控制的 ,他的下一个 Slot 是:DegradeSlot (降级)
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 检测并应用流控规则 checkFlow(resourceWrapper, context, node, count, prioritized); // 触发下一个Slot fireEntry(context, resourceWrapper, node, count, prioritized, args); }
.
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); }
.
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() { @Override public Collection<FlowRule> apply(String resource) { // Flow rule map should not be null. // 获取到所有资源的流控规则 // map的key为资源名称,value为该资源上加载的所有流控规则 // 配置使用是调用方法:FlowRuleManager.loadRules(xxx);目的就是将自定义的流控规则载入, // 此处FlowRuleManager.getFlowRuleMap() 将所有流控规则取出 Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); // 获取指定资源的所有流控规则 return flowRules.get(resource); } };
.
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } // 获取到指定资源的所有流控规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { // 逐个应用流控规则。若无法通过则抛出异常,后续规则不再应用 for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } }
.
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) { return canPassCheck(rule, context, node, acquireCount, false); } public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { // 从规则中获取要限定的来源 String limitApp = rule.getLimitApp(); // 若限流的来源为null,则请求直接通过 if (limitApp == null) { return true; } // 使用规则处理集群流控 if (rule.isClusterMode()) { return passClusterCheck(rule, context, node, acquireCount, prioritized); } // 使用规则处理单机流控 return passLocalCheck(rule, context, node, acquireCount, prioritized); } private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { // 通过规则形成选择出的规则node Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); // 若没有选择出node,说明没有规则,则直接返回true,表示通过检测 if (selectedNode == null) { return true; } // 使用规则进行逐项检测 return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
四个实现类,正是管理控制台中的流控效果
DefaultController 为快速失败实现方案
// 快速失败的流控效果中的通过性判断 @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 获取当前时间窗中已经统计的数据 int curCount = avgUsedTokens(node); // 若已经统计的数据与本次请求的数量和 大于 设置的阈值,则返回false,表示没有通过检测 // 若小于等于阈值,则返回true,表示通过检测 if (curCount + acquireCount > count) { if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; } private int avgUsedTokens(Node node) { // 若没有选择出node,则说明没有做统计工作,直接返回0 if (node == null) { return DEFAULT_AVG_USED_TOKENS; } // 若阈值类型为线程数,则直接返回当前的线程数量; // 若阈值类型为QPS,则返回统计的当前的QPS return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); }
StatisticNode 中的实现
@Override public double passQps() { // rollingCounterInSecond.pass() 当前时间窗中统计的通过的请求数量 // rollingCounterInSecond.getWindowIntervalInSec() 时间窗长度 // 这两个数相除,计算出的就是QPS return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec(); }
流控规则对象,管理控制台页面设置流控格则中封装的就是这个对象
public class FlowRule extends AbstractRule { public FlowRule() { super(); setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); } public FlowRule(String resourceName) { super(); setResource(resourceName); setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); } /** * The threshold type of flow control (0: thread count, 1: QPS). */ // 阈值类型,0表示线程数限流,1表示QPS限流 private int grade = RuleConstant.FLOW_GRADE_QPS; /** * Flow control threshold count. */ // 阈值 private double count; /** * 流控模式 * Flow control strategy based on invocation chain. * 直接流控 * {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin); * 关联流控 * {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource); * 链路流控 * {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource). */ private int strategy = RuleConstant.STRATEGY_DIRECT; /** * Reference resource in flow control with relevant resource or context. */ // 流控模式为关联流控时的关联资源 private String refResource; /** * Rate limiter control behavior. * 0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter */ // 流控效果:0表示快速失败,1表示warm up(令牌桶算法),2表示排队等待(漏斗算法),3表示warm up+排队等待 private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; // warm up预热时长 private int warmUpPeriodSec = 10; /** * Max queueing time in rate limiter behavior. */ // 排队等待的超时时间 private int maxQueueingTimeMs = 500; // 是否集群模式 private boolean clusterMode; 。。。。。。。。。。。。。。
降级 ( DegradeSlot )
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 完成熔断降级检测 performChecking(context, resourceWrapper); // 触发下一个节点 fireEntry(context, resourceWrapper, node, count, prioritized, args); } void performChecking(Context context, ResourceWrapper r) throws BlockException { // 获取到当前资源的所有熔断器 List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); // 若熔断器为空,则直结束 if (circuitBreakers == null || circuitBreakers.isEmpty()) { return; } // 逐个尝试所有熔断器 for (CircuitBreaker cb : circuitBreakers) { // 若没有通过当前熔断器,则直接抛出异常 if (!cb.tryPass(context)) { throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); } } }
.
@Override public boolean tryPass(Context context) { // Template implementation. // 熔断器状态为关闭状态,则请求可以通过 if (currentState.get() == State.CLOSED) { return true; } // 熔断器状态为打开状态,此时再查看, // 若下次时间窗时间点已经到达,且熔断器成功由Open变为了Half-Open,则请求通过 if (currentState.get() == State.OPEN) { // For half-open state we allow a request for probing. return retryTimeoutArrived() && fromOpenToHalfOpen(context); } return false; }
>>>>>>> 接下一篇:https://www.cnblogs.com/Alay/p/15488154.html <<<<<<<<<<<<<