SpringBoot进阶教程(六十七)RateLimiter限流

在上一篇文章nginx限流配置中,我们介绍了如何使用nginx限流,这篇文章介绍另外一种限流方式---RateLimiter。

v限流背景

在早期的计算机领域,限流技术(time limiting)被用作控制网络接口收发通信数据的速率。 可以用来优化性能,减少延迟和提高带宽等。 现在在互联网领域,也借鉴了这个概念, 用来为服务控制请求的速率, 如果双十一的限流, 12306的抢票等。 即使在细粒度的软件架构中,也有类似的概念。

系统使用下游资源时,需要考虑下游对资源受限、处理能力,在下游资源无法或者短时间内无法提升处理性能的情况下,可以使用限流器或者类似保护机制,避免下游服务崩溃造成整体服务的不可用。

v限流算法

常见限流算法有两种:漏桶算法和令牌桶算法。

2.1 漏桶算法

漏桶算法(Leaky Bucket)是网络世界中流量整形(Traffic Shaping)或速率限制(Rate Limiting)时经常使用的一种算法,它的主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。

漏桶可以看作是一个带有常量服务时间的单服务器队列,如果漏桶(包缓存)溢出,那么数据包会被丢弃。 在网络中,漏桶算法可以控制端口的流量输出速率,平滑网络上的突发流量,实现流量整形,从而为网络提供一个稳定的流量。

如下图所示,把请求比作是水,水来了都先放进桶里,并以限定的速度出水,当水来得过猛而出水不够快时就会导致水直接溢出,即拒绝服务。

SpringBoot进阶教程(六十七)RateLimiter限流

图片来源于网络,侵删。

可以看出,漏桶算法可以很好的控制流量的访问速度,一旦超过该速度就拒绝服务。

2.2 令牌桶算法

令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。

如下图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

SpringBoot进阶教程(六十七)RateLimiter限流

图片来源于网络,侵删。

本文后面实例中讲到的RateLimiter(Google的Guava包)正是使用的令牌桶算法。

2.3 漏桶算法和令牌桶算法的区别

漏桶算法的出水速度是恒定的,那么意味着如果瞬时大流量的话,将有大部分请求被丢弃掉(也就是所谓的溢出)。漏桶算法通常可以用于限制访问外部接口的流量,保护其他人系统,比如我们请求银行接口,通常要限制并发数。

令牌桶算法生成令牌的速度是恒定的,而请求去拿令牌是没有速度限制的。这意味,面对瞬时大流量,该算法可以在短时间内请求拿到大量令牌,可以处理瞬时流量,而且拿令牌的过程并不是消耗很大的事情。令牌桶算法通常可以用于限制被访问的流量,保护自身系统。

需要注意的是,在某些情况下,漏桶算法不能够有效地使用网络资源,因为漏桶的漏出速率是固定的,所以即使网络中没有发生拥塞,漏桶算法也不能使某一个单独的数据流达到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。通常,漏桶算法与令牌桶算法结合起来为网络流量提供更高效的控制。

vRateLimiter

3.1 基础介绍

RateLimiter 从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个acquire() 会阻塞当前线程直到许可证可用后获取该许可证。一旦获取到许可证,不需要再释放许可证。

RateLimiter使用的是一种叫令牌桶的流控算法,RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如你希望自己的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。

3.2 方法摘要

修饰符和类型 方法和描述
double acquire()
从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求
double acquire(int permits)
从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求
static RateLimiter create(double permitsPerSecond)
根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)
double getRate()
返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数
void setRate(double permitsPerSecond)
更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。
String toString()
返回对象的字符表现形式
boolean tryAcquire()
从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits)
从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待)
boolean tryAcquire(long timeout, TimeUnit unit)
从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待)

3.3 实验

3.3.1 添加引用

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>20.0</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.8.10</version>
        </dependency>

3.3.2 添加注解

package learn.web.interceptor;

import java.lang.annotation.*;

/**
 * @author toutou
 * @date by 2020/12
 * @des 限流注解
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Limiting {

    // 默认每秒放入桶中的token
    double limitNum() default 20;

    String name() default "";
}

3.3.3 aop切面

package learn.web.interceptor;

import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author toutou
 * @date by 2020/12
 * @des
 */
@Aspect
@Component
@Slf4j
public class RateLimitAspect {
    private ConcurrentHashMap<String, RateLimiter> RATE_LIMITER  = new ConcurrentHashMap<>();
    private RateLimiter rateLimiter;

    @Pointcut("@annotation(learn.web.interceptor.Limiting)")
    public void serviceLimit() {
    }

    @Around("serviceLimit()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        //获取拦截的方法名
        Signature sig = point.getSignature();
        //获取拦截的方法名
        MethodSignature msig = (MethodSignature) sig;
        //返回被织入增加处理目标对象
        Object target = point.getTarget();
        //为了获取注解信息
        Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
        //获取注解信息
        Limiting annotation = currentMethod.getAnnotation(Limiting.class);
        double limitNum = annotation.limitNum(); //获取注解每秒加入桶中的token
        String functionName = msig.getName(); // 注解所在方法名区分不同的限流策略

        if(RATE_LIMITER.containsKey(functionName)){
            rateLimiter=RATE_LIMITER.get(functionName);
        }else {
            RATE_LIMITER.put(functionName, RateLimiter.create(limitNum));
            rateLimiter=RATE_LIMITER.get(functionName);
        }
        if(rateLimiter.tryAcquire()) {
            log.info("处理完成");
            return point.proceed();
        } else {
            throw new RuntimeException("服务器繁忙,请稍后再试。");
        }
    }
}

3.3.4 添加Controller测试

package learn.web.controller;

import learn.model.vo.Result;
import learn.web.interceptor.Limiting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author toutou
 * @date by 2020/12
 * @des
 */
@Slf4j
@RestController
public class IndexController {
    /**
     * 使用限流注解的接口1
     * @return
     */
    @GetMapping("/limit1")
    @Limiting(limitNum = 1, name = "limiting1")
    public Result Limit1() {
        return Result.setSuccessResult("limiting1");
    }

    /**
     * 使用限流注解的接口2
     * @return
     */
    @GetMapping("/limit2")
    @Limiting(limitNum = 5, name = "limiting2")
    public Result Limit2() {
        return Result.setSuccessResult("limiting2");
    }
}

3.3.5 测试效果

ab测试截图

SpringBoot进阶教程(六十七)RateLimiter限流

浏览器截图

SpringBoot进阶教程(六十七)RateLimiter限流

其他参考/学习资料:

v源码地址

https://github.com/toutouge/javademosecond/tree/master/hellolearn


作  者:请叫我头头哥
出  处:http://www.cnblogs.com/toutou/
关于作者:专注于基础平台的项目开发。如有问题或建议,请多多赐教!
版权声明:本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。
特此声明:所有评论和私信都会在第一时间回复。也欢迎园子的大大们指正错误,共同进步。或者直接私信
声援博主:如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。您的鼓励是作者坚持原创和持续写作的最大动力!

上一篇:【java】高并发之限流 RateLimiter使用


下一篇:Guava RateLimiter + AOP注解实现单机限流