时间窗限流算法
每个时间窗口长度为 10t ,当 单位时间 10t 时长范围内,超过 100 时,将会被限流;
存在的问题:相邻的时间窗之间截取新的时间窗,如: 16t ---- 26t ,同样为 10t 时间窗长度,但其请求数为 110 ,但系统认为是通过的
该算法存在这样的问题:连续两个时间窗口中的统计数据都没有超出阈值,但在跨窗口的时间窗长度范围内的统计数据却超出了阈值
滑动时间窗限流算法
滑动时间窗限流算法解决了固定时间窗限流算法的问题。其没有划分固定的时间窗起点与终点,而是将每一次请求的到来时间点作为统计时间窗的终点,起点则是终点向前推时间窗长度的时间点。这种时间窗称为“滑动时间窗”
时间点1:
时间点2:
存在的问题:
如下图:分析点1 与分析点2 存在大量的重叠区域,所以统计过程中存在大量测重复工作,浪费了大量的系统资源
算法改进:
将时间窗口进行细分为多个子 时间窗口:样本窗口
针对以上问题,系统采用了一种“折中”的改进措施:将整个时间轴拆分为若干“样本窗口”,样本窗口的长度是小于滑动时间窗口长度的。当等于滑动时间窗口长度时,就变为了“固定时间窗口算法”。 一般时间窗口长度会是样本窗口长度的整数倍。那么是如何判断一个请求是否能够通过呢?当到达样本窗口终点时间时,每个样本窗口会统计一次本样本窗口中的流量数据并记录下来。当一个请求到达时,会统计出当前请求时间点所在样本窗口中的流量数据,然后再获取到当前请求时间点所在时间窗中其它样本窗口的统计数据,求和后,如果没有超出阈值,则通过,否则被限流。
数据统计源码解析
Sentinel滑动时间窗算法源码解析—数据统计如下图:
Sentinel滑动时间窗算法源码解析—使用统计数据如下图:
StatisticNode 滑动时间窗计数器
// 以秒为单位的计量器,定义了一个使用数组保存数据的计量器 private transient volatile Metric rollingCounterInSecond = // SAMPLE_COUNT,样本窗口数量,默认值为2 // INTERVAL,时间窗长度,默认值1000毫秒,1秒 new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); // 一分为单位的计量器 private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
@Override public void addPassRequest(int count) { // 为滑动计数器增加本次访问的数据 rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }
// 这是一个使用数组保存数据的计量器类 public class ArrayMetric implements Metric { // 数据就保存在这个data中 private final LeapArray<MetricBucket> data; 。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
源码:( 请使用 IDEA 结合查看源码 )
// 环形数组 public abstract class LeapArray<T> { // 样本窗口长度 protected int windowLengthInMs; // 一个时间窗中包含的时间窗数量 protected int sampleCount; // 时间窗长度 protected int intervalInMs; private double intervalInSecond; // 这个一个数组,元素为WindowWrap样本窗口 // 注意,这里的泛型 T 实际为 MetricBucket 类型 protected final AtomicReferenceArray<WindowWrap<T>> array; 。。。。。。。。。。。。。。。。。。。。。。。。。。 public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 计算当前时间所在的样本窗口id,即在计算数组LeapArray中的索引 int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. // 计算当前样本窗口的开始时间点 long windowStart = calculateWindowStart(timeMillis); /* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */ while (true) { // 获取到当前时间所在的样本窗口 WindowWrap<T> old = array.get(idx); // 若当前时间所在样本窗口为null,说明该样本窗口还不存在,则创建一个 if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ // 创建一个时间窗 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); // 通过CAS方式将新建窗口放入到array if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 若当前样本窗口的起始时间点与计算出的样本窗口起始时间点相同, // 则说明这两个是同一个样本窗口 } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; // 若当前样本窗口的起始时间点 大于 计算出的样本窗口起始时间点, // 说明计算出的样本窗口已经过时了,需要将原来的样本窗口替换 } else if (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. // 替换掉老的样本窗口 return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 当前样本窗口的起始时间点 小于 计算出的样本窗口起始时间点, // 这种情况一般不会出现,因为时间不会倒流。除非人为修改了系统时钟 } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
。
// 样本窗口实例,泛型T为MetricBucket public class WindowWrap<T> { /** * Time length of a single window bucket in milliseconds. */ // 样本窗口长度 private final long windowLengthInMs; /** * Start timestamp of the window in milliseconds. */ // 样本窗口的起始时间戳 private long windowStart; /** * Statistic data. */ // 当前样本窗口中的统计数据,其类型为 MetricBucket private T value; 。。。。。。。。。。。。。。。。。。
。
// 统计数据的封装类 public class MetricBucket { // 统计的数据存放在这里 // 这里要统计的数据是多维度的,这些维度类型在 MetricEvent 枚举中 private final LongAdder[] counters; private volatile long minRt; 。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
。
// 数据统计的维度 public enum MetricEvent { /** * Normal pass. */ PASS, /** * Normal block. */ BLOCK, EXCEPTION, SUCCESS, RT, /** * Passed in future quota (pre-occupied, since 1.5.0). */ OCCUPIED_PASS }
.....