Sentinel 之 流控、降级

流控

请使用 IDEA 工具 结合查看源码

FlowSlot 负责流量控制的 ,他的下一个 Slot 是:DegradeSlot (降级)

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    // 检测并应用流控规则
    checkFlow(resourceWrapper, context, node, count, prioritized);
    // 触发下一个Slot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

.

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
    throws BlockException {
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

 

.

private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
    @Override
    public Collection<FlowRule> apply(String resource) {
        // Flow rule map should not be null.
        // 获取到所有资源的流控规则
        // map的key为资源名称,value为该资源上加载的所有流控规则
        // 配置使用是调用方法:FlowRuleManager.loadRules(xxx);目的就是将自定义的流控规则载入,
        // 此处FlowRuleManager.getFlowRuleMap() 将所有流控规则取出
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
        // 获取指定资源的所有流控规则
        return flowRules.get(resource);
    }
};

.

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    // 获取到指定资源的所有流控规则
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        // 逐个应用流控规则。若无法通过则抛出异常,后续规则不再应用
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

.

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node,
                                                int acquireCount) {
    return canPassCheck(rule, context, node, acquireCount, false);
}
 
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                boolean prioritized) {
    // 从规则中获取要限定的来源
    String limitApp = rule.getLimitApp();
    // 若限流的来源为null,则请求直接通过
    if (limitApp == null) {
        return true;
    }
 
    // 使用规则处理集群流控
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
 
    // 使用规则处理单机流控
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
 
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                      boolean prioritized) {
    // 通过规则形成选择出的规则node
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    // 若没有选择出node,说明没有规则,则直接返回true,表示通过检测
    if (selectedNode == null) {
        return true;
    }
 
    // 使用规则进行逐项检测
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

四个实现类,正是管理控制台中的流控效果

Sentinel 之 流控、降级

Sentinel 之 流控、降级

DefaultController 为快速失败实现方案

// 快速失败的流控效果中的通过性判断
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    // 获取当前时间窗中已经统计的数据
    int curCount = avgUsedTokens(node);
    // 若已经统计的数据与本次请求的数量和 大于 设置的阈值,则返回false,表示没有通过检测
    // 若小于等于阈值,则返回true,表示通过检测
    if (curCount + acquireCount > count) {
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);
 
                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);
            }
        }
        return false;
    }
    return true;
}
 
private int avgUsedTokens(Node node) {
    // 若没有选择出node,则说明没有做统计工作,直接返回0
    if (node == null) {
        return DEFAULT_AVG_USED_TOKENS;
    }
    // 若阈值类型为线程数,则直接返回当前的线程数量;
    // 若阈值类型为QPS,则返回统计的当前的QPS
    return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

StatisticNode 中的实现

 

@Override
public double passQps() {
    // rollingCounterInSecond.pass() 当前时间窗中统计的通过的请求数量
    // rollingCounterInSecond.getWindowIntervalInSec() 时间窗长度
    // 这两个数相除,计算出的就是QPS
    return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}

流控规则对象,管理控制台页面设置流控格则中封装的就是这个对象

public class FlowRule extends AbstractRule {
 
    public FlowRule() {
        super();
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }
 
    public FlowRule(String resourceName) {
        super();
        setResource(resourceName);
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }
 
    /**
     * The threshold type of flow control (0: thread count, 1: QPS).
     */
    // 阈值类型,0表示线程数限流,1表示QPS限流
    private int grade = RuleConstant.FLOW_GRADE_QPS;
 
    /**
     * Flow control threshold count.
     */
    // 阈值
    private double count;
 
    /**
     *  流控模式
     * Flow control strategy based on invocation chain.
     * 直接流控
     * {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin);
     * 关联流控
     * {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource);
     * 链路流控
     * {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource).
     */
    private int strategy = RuleConstant.STRATEGY_DIRECT;
 
    /**
     * Reference resource in flow control with relevant resource or context.
     */
    // 流控模式为关联流控时的关联资源
    private String refResource;
 
    /**
     * Rate limiter control behavior.
     * 0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter
     */
    // 流控效果:0表示快速失败,1表示warm up(令牌桶算法),2表示排队等待(漏斗算法),3表示warm up+排队等待
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
 
    // warm up预热时长
    private int warmUpPeriodSec = 10;
 
    /**
     * Max queueing time in rate limiter behavior.
     */
    // 排队等待的超时时间
    private int maxQueueingTimeMs = 500;
 
    // 是否集群模式
    private boolean clusterMode;
 
 。。。。。。。。。。。。。。

 


降级 ( DegradeSlot )

 

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    // 完成熔断降级检测
    performChecking(context, resourceWrapper);
    // 触发下一个节点
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
 
void performChecking(Context context, ResourceWrapper r) throws BlockException {
    // 获取到当前资源的所有熔断器
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    // 若熔断器为空,则直结束
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        return;
    }
    // 逐个尝试所有熔断器
    for (CircuitBreaker cb : circuitBreakers) {
        // 若没有通过当前熔断器,则直接抛出异常
        if (!cb.tryPass(context)) {
            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}

.

@Override
public boolean tryPass(Context context) {
    // Template implementation.
    // 熔断器状态为关闭状态,则请求可以通过
    if (currentState.get() == State.CLOSED) {
        return true;
    }
 
    // 熔断器状态为打开状态,此时再查看,
    // 若下次时间窗时间点已经到达,且熔断器成功由Open变为了Half-Open,则请求通过
    if (currentState.get() == State.OPEN) {
        // For half-open state we allow a request for probing.
        return retryTimeoutArrived() && fromOpenToHalfOpen(context);
    }
    return false;
}

 

>>>>>>> 接下一篇:https://www.cnblogs.com/Alay/p/15488154.html <<<<<<<<<<<<<

 

上一篇:Photoshop将普通照片处理为圣诞夜景


下一篇:Redis之哨兵模式