微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析

5.4 Sentinel 流控、统计和熔断的源码分析


前言

参考资料
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服务原理与实战》
《B站 尚硅谷 SpringCloud 框架开发教程 周阳》
《Sentinel GitHub 官网》
《Sentinel 官网》

调用链路是 Sentinel 的工作主流程,由各个 Slot 槽组成,将不同的 Slot 槽按照顺序串在一起,从而将不同的功能(限流、降级、系统保护)组合在一起;

本篇《2. 获取 ProcessorSlot 链》将从源码级讲解如何获取调用链路,接着会以遍历链表的方式处理每一个 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分别对应本篇《3. 流控槽实施流控逻辑》、《4. 统计槽实施指标数据统计》和《5. 熔断槽实施服务熔断》;


1. Sentinel 的自动装配

1.2 依赖引入

  • 我们引入 Sentinel 的 starter 依赖文件,不需要太多额外操作,即可使用 Sentinel 默认自带的限流功能,原因是这些配置和功能都给我们自动装配了;
  • 在 Spring-Cloud-Alibaba-Sentinel 包下的 META-INF/spring.factories 文件里定义了会自动装配哪些类;

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析

  • SentinelWebAutoConfiguration:对 Web Servlet 环境的支持;
  • SentinelWebFluxAutoConfiguration:对 Spring WebFlux 的支持;
  • SentinelEndpointAutoConfiguration:暴露 Endpoint 信息;
  • SentinelFeignAutoConfiguration:用于适应 Feign 组件;
  • SentinelAutoConfiguration:支持对 RestTemplate 的服务调用使用 Sentinel 进行保护;

1.3 SentinelWebAutoConfiguration 配置类

  • SentinelWebAutoConfiguration 配置类中自动装配了一个 FilterRegistrationBean,其主要作用是注册一个 CommonFilter,并且默认情况下通过 /* 规则拦截所有的请求;
@Configuration
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelWebAutoConfiguration {
    
    //省略其他代码
    
	@Bean
	@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled", matchIfMissing = true)
	public FilterRegistrationBean sentinelFilter() {
		FilterRegistrationBean<Filter> registration = new FilterRegistrationBean<>();

		SentinelProperties.Filter filterConfig = properties.getFilter();

		if (filterConfig.getUrlPatterns() == null || filterConfig.getUrlPatterns().isEmpty()) {
			List<String> defaultPatterns = new ArrayList<>();
			//默认情况下通过 /* 规则拦截所有的请求
			defaultPatterns.add("/*");
			filterConfig.setUrlPatterns(defaultPatterns);
		}

		registration.addUrlPatterns(filterConfig.getUrlPatterns().toArray(new String[0]));
		//【点进去】注册 CommonFilter
		Filter filter = new CommonFilter();
		registration.setFilter(filter);
		registration.setOrder(filterConfig.getOrder());
		registration.addInitParameter("HTTP_METHOD_SPECIFY", String.valueOf(properties.getHttpMethodSpecify()));
		log.info("[Sentinel Starter] register Sentinel CommonFilter with urlPatterns: {}.", filterConfig.getUrlPatterns());
		return registration;
	}
}

1.4 CommonFilter 过滤器

  • CommonFilter 过滤器的作用与源码如下:
    • 从请求中获取目标 URL;
    • 获取 Urlcleaner;
    • 对当前 URL 添加限流埋点;
public class CommonFilter implements Filter {
    
    //省略部分代码

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest sRequest = (HttpServletRequest)request;
        Entry urlEntry = null;
        try {
            //解析请求 URL
            String target = FilterUtil.filterTarget(sRequest);
            //URL 清洗
            UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
            if (urlCleaner != null) {
                //如果存在,则说明配置过 URL 清洗策略,替换配置的 targer
                target = urlCleaner.clean(target);
            }
            if (!StringUtil.isEmpty(target)) {
                String origin = this.parseOrigin(sRequest);
                ContextUtil.enter("sentinel_web_servlet_context", origin);
                if (this.httpMethodSpecify) {
                    String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + ":" + target;
                    //使用 SphU.entry() 方法对 URL 添加限流埋点
                    urlEntry = SphU.entry(pathWithHttpMethod, 1, EntryType.IN);
                } else {
                    urlEntry = SphU.entry(target, 1, EntryType.IN);
                }
            }
            //执行过滤
            chain.doFilter(request, response);
        } catch (BlockException var14) {
            HttpServletResponse sResponse = (HttpServletResponse)response;
            WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, var14);
        } catch (ServletException | RuntimeException | IOException var15) {
            Tracer.traceEntry(var15, urlEntry);
            throw var15;
        } finally {
            if (urlEntry != null) {
                urlEntry.exit();
            }
            ContextUtil.exit();
        }
    }
}

1.5 小结

  • 对于 Web Servlet 环境,只是通过 Filter 的方式将所有请求自动设置为 Sentinel 的资源,从而达到限流的目的;

2. 获取 ProcessorSlot 链

  • Sentinel 的工作原理主要依靠 ProcessorSlot 链,遍历链中的每一个 Slot 槽,执行相应逻辑;

2.1 Sentinel 源码包结构

  • 在 DeBug 之前,我们需要对 Sentinel 的源码包结构做个分析,以找到方法的入口;
模块名 说明
sentinel-adapter 负责针对主流开源框架进行限流适配,如:Dubbo、gRPC、Zuul 等;
sentinel-core Sentinel 核心库,提供限流、熔断等实现;
sentinel-dashboard 控制台模块,提供可视化监控和管理;
sentinel-demo 官方案例;
sentinel-extension 实现不同组件的数据源扩展,如:Nacos、ZooKeeper、Apollo 等;
sentinel-transport 通信协议处理模块;
  • Slot 槽是 Sentinel 的核心,因此方法的入口在 sentinel-core 核心库,里面有好多个 SphU.entry() 方法,我们给方法打上断点,DeBug 进入,然后登录 Sentinel 控制台;

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析

2.2 获取 ProcessorSlot 链与操作 Slot 槽的入口 CtSph.entryWithPriority()

  • 一直进入最终方法的实现在 CtSph.entryWithPriority() 方法里,其主要逻辑与源码如下:
    • 校验全局上下文 context;
    • 构造 ProcessorSlot 链;
    • 遍历 ProcessorSlot 链操作 Slot 槽(遍历链表);
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        //上下文量已经超过阈值 -> 只初始化条目,不进行规则检查
        return new CtEntry(resourceWrapper, null, context);
    }

    if (context == null) {
        //没有指定上下文 -> 使用默认上下文 context
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }
     
     if (!Constants.ON) {
        //全局开关关闭 -> 没有规则检查
        return new CtEntry(resourceWrapper, null, context);
    }
    //【断点步入 2.2.1】通过 lookProcessChain 方法获取 ProcessorSlot 链
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);


    if (chain == null) {
        //表示资源量超过 Constants.MAX_SLOT_CHAIN_SIZE 常量 -> 不会进行规则检查
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        //【断点步入 3./4./5.】执行 ProcessorSlot 对 ProcessorSlot 链中的 Slot 槽遍历操作(遍历链表的方式)
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        //这种情况不应该发生,除非 Sentinel 内部存在错误
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

2.2.1 构造 ProcessorSlot 链 CtSph.lookProcessChain()

  • 进入 CtSph.lookProcessChain() 方法;
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    //从缓存中获取 slot 调用链
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                //【断点步入】构造 Slot 链(责任链模式)
                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}
  • 最终调用 DefaultSlotChainBuilder.build() 方法构造 DefaultProcessorSlotChain;
@Override
public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
    for (ProcessorSlot slot : sortedSlotList) {
        if (!(slot instanceof AbstractLinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }
        chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
    }
    return chain;
}
  • 可以看到最后 ProcessorSlotChain 链中有 10 个 Slot 插槽:
  • 在本篇笔记中我们关注 3 个槽:
    • FlowSlot:进行流控规则校验,对应本篇《3. 流控槽实施流控逻辑》;
    • StatisticSlot:实现指标数据的统计,对应本篇《4. 统计槽实施指标数据统计》;
    • DegradeSlot:服务熔断,对应本篇《5. 熔断槽实施服务熔断》

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析

2.2.2 操作 Slot 槽的入口

  • 操作 Slot 槽的入口方法是:ProcessorSlot.entry()
  • 接着会以遍历链表的方式操作每个 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分别对应下面的《3. 流控槽实施流控逻辑》、《4. 统计槽实施指标数据统计》和《5. 熔断槽实施服务熔断》;

3. 流控槽实施流控逻辑 FlowSlot.entry()

  • 进入 ProcessorSlot.entry() 方法,它会遍历每个 Slot 插槽,并对其进行操作,其中会经过 FlowSlot.entry() 方法(需要提前给该方法打上断点),方法的逻辑跟源码如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    //【断点步入】检查流量规则
    checkFlow(resourceWrapper, context, node, count, prioritized);
    //调用下一个 Slot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
  • 进入 FlowSlot.checkFlow() 方法,最终调用 FlowRuleChecker.checkFlow() 方法,方法的逻辑和源码如下:
    • 遍历所有流控规则 FlowRule;
    • 针对每个规则调用 canPassCheck 进行校验;
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    //【断点步入 3.1】获取流控规则
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        //遍历所有流控规则 FlowRule
        for (FlowRule rule : rules) {
            //【点进去 3.2】校验每条规则
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

3.1 获取流控规则 FlowSlot.ruleProvider.apply()

  • 进入 FlowSlot.ruleProvider.apply() 方法,获取到 Sentinel 控制台上的流控规则;
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
    @Override
    public Collection<FlowRule> apply(String resource) {
        // Flow rule map should not be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
        return flowRules.get(resource);
    }
};

3.2 校验每条规则 FlowRuleChecker.canPassCheck()

  • 进入 FlowRuleChecker.canPassCheck() 方法,分集群和单机模式校验每条规则;
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }
    //集群模式
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    //【点进去】单机模式
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
  • 由于我们是单机模式,进入 FlowRuleChecker.passLocalCheck() 方法,其主要逻辑和源码如下:
    • 根据来源和策略获取 Node,从而拿到统计的 runtime 信息;
    • 使用流量控制器检查是否让流量通过;
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    //【点进去 3.2.1】获取 Node
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }
    //【点进去 3.2.2】获取流控的处理策略
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

3.2.1 获取 Node FlowRuleChecker.selectNodeByRequesterAndStrategy()

  • 进入 FlowRuleChecker.selectNodeByRequesterAndStrategy() 方法,其根据 FlowRule 中配置的 Strategy 和 limitApp 属性,返回不同处理策略的 Node;
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
    //limitApp 不能为空
    String limitApp = rule.getLimitApp();
    int strategy = rule.getStrategy();
    String origin = context.getOrigin();
    
    //场景1:限流规则设置了具体应用,如果当前流量就是通过该应用的,则命中场景1
    if (limitApp.equals(origin) && filterOrigin(origin)) {
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Matches limit origin, return origin statistic node.
            return context.getOriginNode();
        }
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
    //场景2:限流规则未指定任何具体应,默认为default,则当前流量直接命中场景2
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Return the cluster node.
            return node.getClusterNode();
        }

        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
    //场景3:限流规则设置的是other,当前流量未命中前两种场景
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            return context.getOriginNode();
        }
        return selectReferenceNode(rule, context, node);
    }
    return null;
}
  • 假设我们对接口 UserService 配置限流 1000 QPS,这 3 种场景分别如下:
    • 场景 1:目的是优先保障重要来源的流量。我们需要区分调用来源,将限流规则细化。对A应用配置500QPS,对B应用配置200QPS,此时会产生两条规则:A应用请求的流量限制在500,B应用请求的流量限制在200;
    • 场景 2:没有特别重要来源的流量。我们不想区分调用来源,所有入口调用 UserService 共享一个规则,所有 client 加起来总流量只能通过 1000 QPS;
    • 场景 3:配合第1种场景使用,在长尾应用多的情况下不想对每个应用进行设置,没有具体设置的应用都将命中;

3.2.2 获取流控的处理策略 `FlowRule.getRater().canPass()

  • 进入 FlowRule.getRater().canPass() 方法,首先通过 FlowRule.getRater() 获得流控行为 TrafficShapingController,这是一个接口,有四种实现类,如下图所示:

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析

  • 有以下四种处理策略:
    • DefaultController:直接拒绝;
    • RateLimiterController:匀速排队;
    • WarmUpController:冷启动(预热);
    • WarmUpRateLimiterController:匀速+冷启动。
  • 最终调用 TrafficShapingController.canPass() 方法,执行流控行为;

4. 统计槽实施指标数据统计 StatisticSlot.entry()

  • 限流的核心是限流算法的实现,Sentinel 默认采用滑动窗口算法来实现限流,具体的指标数据统计由 StatisticSlot 实现;
  • 我们给 StatisticSlot.entry() 方法里的语句打上断点,运行到光标处;
  • StatisticSlot.entry() 方法的核心是使用 Node 统计“增加线程数”和“请求通过数”;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    try {
        //先执行后续 Slot 检查,再统计数据(即先调用后续所有 Slot)
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        //【断点步入】使用 Node 统计“增加线程数”和“请求通过数”
        node.increaseThreadNum();
        node.addPassRequest(count);

        //如果存在来源节点,则对来源节点增加线程数和请求通过数
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }
        
        //如果是入口流量,则对全局节点增加线程数和请求通过数
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        //执行事件通知和回调函数
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    //处理优先级等待异常    
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum();
        //如果有来源节点,则对来源节点增加线程数
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        //如果是入口流量,对全局节点增加线程数
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        //执行事件通知和回调函数
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    //处理限流、熔断等异常    
    } catch (BlockException e) {
        
        //省略
        
        throw e;
    //处理业务异常    
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        throw e;
    }
}

4.1 统计“增加线程数”和“请求通过数”

  • 这两个方法都是调用同一个类的,笔者以第一个为例,进入 DefaultNode.increaseThreadNum() 方法,最终调用的是 StatisticNode.increaseThreadNum(),而统计也是依靠 StatisticNode 维护的,这里放上 StatisticNode 的统计核心与源码:
    • StatisticNode 持有两个计数器 Metric 对象,统计行为是通过 Metric 完成的;
public class StatisticNode implements Node {

    //省略其他代码

    //【断点步入】最近 1s 滑动窗口计数器(默认 1s)
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    //最近 1min 滑动窗口计数器(默认 1min)
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
    
    //增加 “请求通过数” 
    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }
    //增加 RT 和成功数
    @Override
    public void addRtAndSuccess(long rt, int successCount) {
        rollingCounterInSecond.addSuccess(successCount);
        rollingCounterInSecond.addRT(rt);
        rollingCounterInMinute.addSuccess(successCount);
        rollingCounterInMinute.addRT(rt);
    }

    //增加“线程数”
    @Override
    public void increaseThreadNum() {
        curThreadNum.increment();
    }
}
  • 这里还有减少请求通过数(线程数)、统计最大值等方法,由于篇幅有限,这里不放出,感兴趣的读者可以自己 DeBug 进入看看;

4.2 数据统计的数据结构

4.2.1 ArrayMetric 指标数组

  • ArrayMetric 的构造方法需要先给方法打上断点,重新 DeBug,在初始化时注入构造;
public class ArrayMetric implements Metric {
    
    //省略其他代码

    //【点进去 4.2.2】数据存储
    private final LeapArray<MetricBucket> data;
    
    //最近 1s 滑动计数器用的是 OccupiableBucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
    
    //最近 1min 滑动计数器用的是 BucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }

    //增加成功数
    @Override
    public void addSuccess(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addSuccess(count);
    }

    //增加通过数
    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }

    //增加 RT
    @Override
    public void addRT(long rt) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addRT(rt);
    }
}

4.2.2 LeapArray 环形数组

  • LeapArray 是处理数据的核心数据结构,采用滑动窗口算法;
  • ArrayMetric 中持有 LeapArray 对象,所有方法都是对 LeapArray 进行操作;
  • LeapArray 是环形的数据结构,为了节约内存,它存储固定个数的窗口对象 WindowWrap,只保存最近一段时间的数据,新增的时间窗口会覆盖最早的时间窗口;
public abstract class LeapArray<T> {

    //省略其他代码

    //单个窗口的长度(1个窗口多长时间)
    protected int windowLengthInMs;
    //采样窗口个数
    protected int sampleCount;
    //全部窗口的长度(全部窗口多长时间)
    protected int intervalInMs;
    private double intervalInSecond;
    //窗口数组:存储所有窗口(支持原子读取和写入)
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    //更新窗口数据时用的锁
    private final ReentrantLock updateLock = new ReentrantLock();

    public LeapArray(int sampleCount, int intervalInMs) {
        //计算单个窗口的长度
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
    //【点进去 4.2.3】获取当前窗口
    public WindowWrap<T> currentWindow() {
        //这里参数是当前时间
        return currentWindow(TimeUtil.currentTimeMillis());
    }
    //获取指定时间的窗口
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 计算数组下标
        int idx = calculateTimeIdx(timeMillis);
        //计算当前请求对应的窗口开始时间
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * 从 array 中获取窗口。有 3 种情况:
         * (1) array 中窗口不在,创建一个 CAS 并写入 array;
         * (2) array 中窗口开始时间 = 当前窗口开始时间,直接返回;
         * (3) array 中窗口开始时间 < 当前窗口开始时间,表示 o1d 窗口已过期,重置窗口数据并返回;
         */
        while (true) {
            // 取窗口
            WindowWrap<T> old = array.get(idx);
            //(1)窗口不在
            if (old == null) {
                //创建一个窗口
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                //CAS将窗口写进 array 中并返回(CAS 操作确保只初始化一次)
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    //并发写失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候 array 中有数据了会命中第2种情况;
                    Thread.yield();
                }
            //(2)array 中窗口开始时间 = 当前窗口开始时间
            } else if (windowStart == old.windowStart()) {
                //直接返回
                return old;
            //(3)array 中窗口开始时间 < 当前窗口开始时间    
            } else if (windowStart > old.windowStart()) {
                //尝试获取更新锁
                if (updateLock.tryLock()) {
                    try {
                        //拿到锁的线程才重置窗口
                        return resetWindowTo(old, windowStart);
                    } finally {
                        //释放锁
                        updateLock.unlock();
                    }
                } else {
                    //并发加锁失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候因为 old 对象时间更新了会命中第 2 种情况;
                    Thread.yield();
                }
            //理论上不会出现    
            } else if (windowStart < old.windowStart()) {
                // 正常情况不会进入该分支(机器时钟回拨等异常情况)
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
    //计算索引
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        //timeId 降低时间精度
        long timeId = timeMillis / windowLengthInMs;
        //计算当前索引,这样我们就可以将时间戳映射到 leap 数组
        return (int)(timeId % array.length());
    }
    //计算窗口开始时间
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }
}

4.2.3 WindowWrap 窗口包装类

  • WindowWrap 是一个窗口对象,它是一个包装类,包装的对象是 MetricBucket
public class WindowWrap<T> {
    //窗口长度,与 LeapArray 的 windowLengthInMs 一致
    private final long windowLengthInMs;
    //窗口开始时间,其值是 windowLengthInMs 的整数倍
    private long windowStart;
    //窗口的数据,支持 MetricBucket 类型,存储统计数据
    private T value;

    //省略其他代码
}

4.2.4 MetricBucket 指标桶

  • MetricBucket 类的定义如下,可以发现指标数据存在 LongAdder[] counters中;
  • LongAdder 是 JDK1.8 中新增的类,用于在高并发场景下代替AtomicLong,以用空间换时间的方式降低了 CAS 失败的概率,从而提高性能;
public class MetricBucket {
    /**
     * 存储指标的计数器;
     * LongAdder 是线程安全的计数器
     * counters[0]  PASS 通过数;
     * counters[1]  BLOCK 拒绝数;
     * counters[2]  EXCEPTION 异常数;
     * counters[3]  SUCCESS 成功数;
     * counters[4]  RT 响应时长;
     * counters[5]  OCCUPIED_PASS 预分配通过数;
     **/
    private final LongAdder[] counters;

    //最小 RT,默认值是 5000ms
    private volatile long minRt;

    //构造中初始化
    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }

    //覆盖指标
    public MetricBucket reset(MetricBucket bucket) {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
            counters[event.ordinal()].add(bucket.get(event));
        }
        initMinRt();
        return this;
    }

    private void initMinRt() {
        this.minRt = SentinelConfig.statisticMaxRt();
    }

    //重置指标为0
    public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }
    //获取指标,从 counters 中返回
    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }
    //添加指标
    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }

    public long pass() {
        return get(MetricEvent.PASS);
    }

    public long block() {
        return get(MetricEvent.BLOCK);
    }

    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }

    public void addBlock(int n) {
        add(MetricEvent.BLOCK, n);
    }

    //省略其他代码
}

4.2.5 各数据结构的依赖关系

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析

4.2.6 LeapArray 统计数据的大致思路

  • 创建一个长度为 n 的数组,数组元素就是窗口,窗口包装了 1 个指标桶,桶中存放了该窗口时间范围中对应的请求统计数据;
  • 可以想象成一个环形数组在时间轴上向右滚动,请求到达时,会命中数组中的一个窗口,那么该请求的数据就会存到命中的这个窗口包含的指标桶中;
  • 当数组转满一圈时,会回到数组的开头,而此时下标为 0 的元素需要重复使用,它里面的窗口数据过期了,需要重置,然后再使用。具体过程如下图:

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析


5. 熔断槽实施服务熔断 DegradeSlot.entry()

  • 服务熔断是通过 DegradeSlot 来实现的,它会根据用户配置的熔断规则和系统运行时各个 Node 中的统计数据进行熔断判断;
  • 注意:熔断功能在 Sentinel-1.8.0 版本前后有较大变化;
  • 我们给 DegradeSlot.entry() 方法里的语句打上断点,运行到光标处;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    //【断点步入】熔断检查
    performChecking(context, resourceWrapper);
    //调用下一个 Slot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
  • 进入 DegradeSlot.performChecking() 方法,其逻辑与源码如下:
    • 根据资源名称获取断路器;
    • 循环判断每个断路器;
void performChecking(Context context, ResourceWrapper r) throws BlockException {
    //根据 resourceName 获取断路器
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        return;
    }
    //循环判断每个断路器
    for (CircuitBreaker cb : circuitBreakers) {
        //【点进去】尝试通过断路器
        if (!cb.tryPass(context)) {
            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}

5.1 继续或取消熔断功能

  • 进入 AbstractCircuitBreaker.tryPass() 方法,当请求超时并且处于探测恢复(半开状态,HALF-OPEN 状态)失败时继续断路功能;
@Override
public boolean tryPass(Context context) {
    //当前断路器状态为关闭
    if (currentState.get() == State.CLOSED) {
        return true;
    }
    if (currentState.get() == State.OPEN) {
        //【点进去】对于半开状态,我们尝试通过
        return retryTimeoutArrived() && fromOpenToHalfOpen(context);
    }
    return false;
}
  • 进入 AbstractCircuitBreaker.fromOpenToHalfOpen() 方法,实现状态的变更;
protected boolean fromOpenToHalfOpen(Context context) {
    //尝试将状态从 OPEN 设置为 HALF_OPEN
    if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
        //状态变化通知
        notifyObservers(State.OPEN, State.HALF_OPEN, null);
        Entry entry = context.getCurEntry();
        //在 entry 添加一个 exitHandler  entry.exit() 时会调用
        entry.whenTerminate(new BiConsumer<Context, Entry>() {
            @Override
            public void accept(Context context, Entry entry) {
                //如果有发生异常,重新将状态设置为OPEN 请求不同通过
                if (entry.getBlockError() != null) {
                    currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                    notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                }
            }
        });
        //此时状态已设置为HALF_OPEN正常通行
        return true;
    }
    //熔断
    return false;
}
  • 上述讲解了:状态从 OPEN 变为 HALF_OPEN,HALF_OPEN 变为 OPEN;
  • 但状态从 HALF_OPEN 变为 CLOSE 需要在正常执行完请求后,由 entry.exit() 调用 DegradeSlot.exit() 方法来改变状态;

5.2 请求失败,启动熔断

  • 状态从 HALF_OPEN 变为 CLOSE 的实现方法在 DegradeSlot.exit()
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
    Entry curEntry = context.getCurEntry();
    //无阻塞异常
    if (curEntry.getBlockError() != null) {
        fireExit(context, r, count, args);
        return;
    }
    //通过资源名获取断路器
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    //没有配置断路器,则直接放行
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        fireExit(context, r, count, args);
        return;
    }

    if (curEntry.getBlockError() == null) {
        for (CircuitBreaker circuitBreaker : circuitBreakers) {
            //【点进去】在请求完成时
            circuitBreaker.onRequestComplete(context);
        }
    }
    fireExit(context, r, count, args);
}
  • 进入 ExceptionCircuitBreaker.onRequestComplete() 方法,其主要逻辑与源码如下:
    • 请求失败比例与总请求比例加 1,用于判断后续是否超过阈值;
@Override
public void onRequestComplete(Context context) {
    Entry entry = context.getCurEntry();
    if (entry == null) {
        return;
    }
    Throwable error = entry.getError();
    //简单错误计数器
    SimpleErrorCounter counter = stat.currentWindow().value();
    if (error != null) {
        //异常请求数加 1
        counter.getErrorCount().add(1);
    }
    //总请求数加 1
    counter.getTotalCount().add(1);
    //【点进去】超过阈值时变更状态
    handleStateChangeWhenThresholdExceeded(error);
}
  • 进入 ExceptionCircuitBreaker.handleStateChangeWhenThresholdExceeded() 方法,变更状态;
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
    //全开则直接放行
    if (currentState.get() == State.OPEN) {
        return;
    }
    //半开状态
    if (currentState.get() == State.HALF_OPEN) {
        //检查请求
        if (error == null) {
            //发生异常,将状态从半开 HALF_OPEN 转为关闭 CLOSE
            fromHalfOpenToClose();
        } else {
            //无异常,解开半开状态
            fromHalfOpenToOpen(1.0d);
        }
        return;
    }
    
    //计算是否超过阈值
    List<SimpleErrorCounter> counters = stat.values();
    long errCount = 0;
    long totalCount = 0;
    for (SimpleErrorCounter counter : counters) {
        errCount += counter.errorCount.sum();
        totalCount += counter.totalCount.sum();
    }
    if (totalCount < minRequestAmount) {
        return;
    }
    double curCount = errCount;
    if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
        //熔断策略为:异常比例
        curCount = errCount * 1.0d / totalCount;
    }
    if (curCount > threshold) {
        transformToOpen(curCount);
    }
}

6. Sentinel 源码结构图小结

  • SphU.entry():核心逻辑的入口函数;
    • CtSph.entryWithPriority():获取 Slot 链,操作 Slot 槽;
      • CtSph.lookProcessChain():获取 ProcessorSlot 链;
        • DefaultSlotChainBuilder.build():构造 DefaultProcessorSlotChain 链(里面有 10 个 Slot 插槽);
      • ProcessorSlot.entry():遍历 ProcessorSlot 链;
        • FlowSlot.entry():遍历到 FlowSlot 槽,限流规则;

          • FlowSlot.checkFlow():检查流量规则;
            • FlowRuleChecker.checkFlow():使用检查器检查流量规则;
              • FlowSlot.ruleProvider.apply():获取流控规则;
              • FlowRuleChecker.canPassCheck():校验每条规则;
                • FlowRuleChecker.passClusterCheck():集群模式;
                • FlowRuleChecker.passLocalCheck():单机模式;
                  • FlowRuleChecker.selectNodeByRequesterAndStrategy():获取 Node;
                  • FlowRule.getRater():获得流控行为 TrafficShapingController;
                  • TrafficShapingController.canPass():执行流控行为;
        • StatisticSlot.entry:遍历到 StatisticSlot 槽,统计数据;

          • DefaultNode.increaseThreadNum():统计“增加线程数”;
            • StatisticNode.increaseThreadNum():统计“请求通过数”;
              • ArrayMetric.ArrayMetric():初始化指标数组;
                • LeapArray:环形数组;
                  • WindowWrap:窗口包装类;
                • MetricBucket:指标桶;
          • DefaultNode.addPassRequest():统计“增加线程数”;
            • StatisticNode.addPassRequest():同上;
        • DegradeSlot.entry():遍历到 DegradeSlot 槽,服务熔断;

          • DegradeSlot.performChecking():执行检查;
            • DegradeRuleManager.getCircuitBreakers():根据 resourceName 获取断路器;
            • AbstractCircuitBreaker.tryPass():继续或取消熔断功能;
              • AbstractCircuitBreaker.fromOpenToHalfOpen():尝试通过半开状态;
        • DegradeSlot.exit():请求失败(超时),启动熔断;

          • ExceptionCircuitBreaker.onRequestComplete():在请求完成时操作;
            • ExceptionCircuitBreaker.handleStateChangeWhenThresholdExceeded():变更状态;

最后

新人制作,如有错误,欢迎指出,感激不尽!
欢迎关注公众号,会分享一些更日常的东西!
如需转载,请标注出处!
微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析
上一篇:Certificate used to sign the license is not signed by JetBrains root certifi(2)问题解决


下一篇:从需求设计角度看redis的sentinel