Redis&Lua の令牌桶 -初体验

前言

本文是在java语言场景下,基于Redis内存数据库和Lua脚本技术实现的令牌桶方案。需要看官了解简单的上述名词的含义。

引用

https://www.jianshu.com/p/c8cee1507ccc?from=singlemessage

名词解释

Redis内存数据库

Lua脚本

令牌桶


案例


日常业务中,服务调用角度,pc端使用与app端使用频次有明显的区别。一方面,为了服务的稳定性;另一方面,业务场景中不必要的高频次使用并不合理。由此,部分业务场景衍生出了一些在服务接口层面的卡口的需求,类同腾讯的IM在接口使用上的限制。eg:接口的并发请求频次、亦或是单个窗口期内的使用次数的限制等。

在有类似的业务需求的同时,参照了市面上,比较合适(易上手,易维护,易传达等)的技术方案,想到了,基于Redis内存数据库和lua脚本的令牌桶方案,也因此想要记录一篇本次使用记录。

下面开始上代码!!!

引入依赖

Redis&Lua の令牌桶  -初体验
 1 <dependencies>
 2     <dependency>
 3         <groupId>org.springframework.boot</groupId>
 4         <artifactId>spring-boot-starter-web</artifactId>
 5     </dependency>
 6     <dependency>
 7         <groupId>org.springframework.boot</groupId>
 8         <artifactId>spring-boot-starter-data-redis</artifactId>
 9     </dependency>
10     <dependency>
11         <groupId>org.springframework.boot</groupId>
12         <artifactId>spring-boot-starter-aop</artifactId>
13     </dependency>
14     <dependency>
15         <groupId>org.apache.commons</groupId>
16         <artifactId>commons-lang3</artifactId>
17     </dependency>
18     <dependency>
19         <groupId>org.springframework.boot</groupId>
20         <artifactId>spring-boot-starter-test</artifactId>
21     </dependency>
22 </dependencies>
依赖

 

Redis配置

Redis&Lua の令牌桶  -初体验
 1 spring.application.name=spring-boot-limit
 2 
 3 # Redis数据库索引
 4 spring.redis.database=0
 5 # Redis服务器地址
 6 spring.redis.host=10.4.89.161
 7 # Redis服务器连接端口
 8 spring.redis.port=6379
 9 # Redis服务器连接密码(默认为空)
10 spring.redis.password=
11 # 连接池最大连接数(使用负值表示没有限制)
12 spring.redis.jedis.pool.max-active=8
13 # 连接池最大阻塞等待时间(使用负值表示没有限制)
14 spring.redis.jedis.pool.max-wait=-1
15 # 连接池中的最大空闲连接
16 spring.redis.jedis.pool.max-idle=8
17 # 连接池中的最小空闲连接
18 spring.redis.jedis.pool.min-idle=0
19 # 连接超时时间(毫秒)
20 spring.redis.timeout=10000
Redis配置

Lua 脚本

Redis&Lua の令牌桶  -初体验
  1 -- 返回码 1:操作成功 0:未配置 -1: 获取失败 -2:修改错误,建议重新初始化 -500:不支持的操作
  2 -- redis hashmap 中存放的内容:
  3 -- last_mill_second 上次放入令牌或者初始化的时间
  4 -- stored_permits 目前令牌桶中的令牌数量
  5 -- max_permits 令牌桶容量
  6 -- interval 放令牌间隔
  7 -- app 一个标志位,表示对于当前key有没有限流存在
  8 
  9 local SUCCESS = 1
 10 local NO_LIMIT = 0
 11 local ACQUIRE_FAIL = -1
 12 local MODIFY_ERROR = -2
 13 local UNSUPPORT_METHOD = -500
 14 
 15 local ratelimit_info = redis.pcall("HMGET",KEYS[1], "last_mill_second", "stored_permits", "max_permits", "interval", "app")
 16 local last_mill_second = ratelimit_info[1]
 17 local stored_permits = tonumber(ratelimit_info[2])
 18 local max_permits = tonumber(ratelimit_info[3])
 19 local interval = tonumber(ratelimit_info[4])
 20 local app = ratelimit_info[5]
 21 
 22 local method = ARGV[1]
 23 
 24 --获取当前毫秒
 25 --考虑主从策略和脚本回放机制,这个time由客户端获取传入
 26 --local curr_time_arr = redis.call('TIME')
 27 --local curr_timestamp = curr_time_arr[1] * 1000 + curr_time_arr[2]/1000
 28 local curr_timestamp = tonumber(ARGV[2])
 29 
 30 
 31 -- 当前方法为初始化
 32 if method == 'init' then
 33     --如果app不为null说明已经初始化过,不要重复初始化
 34     if(type(app) ~='boolean' and app ~=nil) then
 35         return SUCCESS
 36     end
 37 
 38     redis.pcall("HMSET", KEYS[1],
 39         "last_mill_second", curr_timestamp,
 40         "stored_permits", ARGV[3],
 41         "max_permits", ARGV[4],
 42         "interval", ARGV[5],
 43         "app", ARGV[6])
 44     --始终返回成功
 45     return SUCCESS
 46 end
 47 
 48 -- 当前方法为修改配置
 49 if method == "modify" then
 50     if(type(app) =='boolean' or app ==nil) then
 51         return MODIFY_ERROR
 52     end
 53     --只能修改max_permits和interval
 54     redis.pcall("HMSET", KEYS[1],
 55         "max_permits", ARGV[3],
 56         "interval", ARGV[4])
 57 
 58     return SUCCESS
 59 
 60 end
 61 
 62 -- 当前方法为删除
 63 if method == "delete" then
 64     --已经清除完毕
 65     if(type(app) =='boolean' or app ==nil) then
 66         return SUCCESS
 67     end
 68     redis.pcall("DEL", KEYS[1])
 69     return SUCCESS
 70 end
 71 
 72 -- 尝试获取permits
 73 if method == "acquire" then
 74     -- 如果app为null说明没有对这个进行任何配置,返回0代表不限流
 75     if(type(app) =='boolean' or app ==nil) then
 76         return NO_LIMIT
 77     end
 78     --需要获取令牌数量
 79     local acquire_permits = tonumber(ARGV[3])
 80     --计算上一次放令牌到现在的时间间隔中,一共应该放入多少令牌
 81     local reserve_permits = math.max(0, math.floor((curr_timestamp - last_mill_second) / interval))
 82     
 83     local new_permits = math.min(max_permits, stored_permits + reserve_permits)
 84     local result = ACQUIRE_FAIL
 85     --如果桶中令牌数量够则放行
 86     if new_permits >= acquire_permits then
 87         result = SUCCESS
 88         new_permits = new_permits - acquire_permits
 89     end
 90     --更新当前桶中的令牌数量 
 91     redis.pcall("HSET", KEYS[1], "stored_permits", new_permits)
 92     --如果这次有放入令牌,则更新时间
 93     if reserve_permits > 0 then
 94         redis.pcall("HSET", KEYS[1], "last_mill_second", curr_timestamp)
 95     end
 96     return result
 97 end
 98 
 99 
100 return UNSUPPORT_METHOD
Lua脚本

 

redis的lua脚本读取配置:

Redis&Lua の令牌桶  -初体验
1  @Bean("rateLimitLua")
2     public DefaultRedisScript<Long> getRateLimitScript() {
3         DefaultRedisScript<Long> rateLimitLua = new DefaultRedisScript<>();
4         rateLimitLua.setLocation(new ClassPathResource("scripts/rate_limit.lua"));
5         rateLimitLua.setResultType(Long.class);
6         return rateLimitLua;
7     }
脚本读取

适配lua的枚举类

Redis&Lua の令牌桶  -初体验
 1 /**
 2  *  
 3  *
 4  * 限流的具体方法
 5  */
 6 public enum RateLimitMethod {
 7 
 8     //initialize rate limiter
 9     init,
10 
11     //modify rate limiter parameter
12     modify,
13 
14     //delete rate limiter
15     delete,
16 
17     //acquire permits
18     acquire;
19 }
限流的具体方法 Redis&Lua の令牌桶  -初体验
 1 /**
 2  *  
 3  * 操作redis的结果标识类
 4  **/
 5 public enum RateLimitResult {
 6 
 7     SUCCESS(1L),
 8     NO_LIMIT(0L),
 9     ACQUIRE_FAIL(-1L),
10     MODIFY_ERROR(-2L),
11     UNSUPPORT_METHOD(-500L),
12     ERROR(-505L);
13 
14     @Getter
15     private Long code;
16 
17     RateLimitResult(Long code){
18         this.code = code;
19     }
20 
21     public static RateLimitResult getResult(Long code){
22         for(RateLimitResult enums: RateLimitResult.values()){
23             if(enums.code.equals(code)){
24                 return enums;
25             }
26         }
27         throw new IllegalArgumentException("unknown rate limit return code:" + code);
28     }
29 }
操作redis的结果标识类 Redis&Lua の令牌桶  -初体验
 1 /**
 2  * 令牌桶参数对象
 3  **/
 4 @Data
 5 @Builder
 6 public class RateLimitVo { 
 7 /**
 8 *是否生效  false-不生效-默认  true-生效
 9 **/
10     private boolean isLimit;
11 /**
12 *生产令牌间隔,单位-毫秒
13 **/
14     private Double interval;
15 /**
16 *令牌桶内令牌最大数量
17 **/
18     private Integer maxPermits;
19 /**
20 *初始化令牌桶内的数量
21 **/
22     private Integer initialPermits;
23 
24 }
令牌桶参数对象 Redis&Lua の令牌桶  -初体验
 1 /**
 2  * 令牌桶的组装方法
 3  **/
 4 @Service
 5 @Slf4j
 6 public class RateLimitClient {
 7 /**
 8 *redis内key的前缀
 9 **/
10     private static final String RATE_LIMIT_PREFIX = "rate_limit:";
11 
12     @Autowired
13     StringRedisTemplate redisTemplate;
14 
15     @Resource
16     @Qualifier("rateLimitLua")
17     RedisScript<Long> rateLimitScript;
18 
19     public RateLimitResult init(String key, RateLimitVo rateLimitInfo){
20         return exec(key, RateLimitMethod.init,
21                 rateLimitInfo.getInitialPermits(),
22                 rateLimitInfo.getMaxPermits(),
23                 rateLimitInfo.getInterval(),
24                 key);
25     }
26 
27     public RateLimitResult modify(String key, RateLimitVo rateLimitInfo){
28         return exec(key, RateLimitMethod.modify, key,
29                 rateLimitInfo.getMaxPermits(),
30                 rateLimitInfo.getInterval());
31     }
32 
33     public RateLimitResult delete(String key){
34         return exec(key, RateLimitMethod.delete);
35     }
36 
37     public RateLimitResult acquire(String key){
38         return acquire(key, 1);
39     }
40 
41     public RateLimitResult acquire(String key, Integer permits){
42         return exec(key, RateLimitMethod.acquire, permits);
43     }
44 
45     /**
46      * 执行redis的具体方法,限制method,保证没有其他的东西进来
47      * @param key
48      * @param method
49      * @param params
50      * @return
51      */
52     private RateLimitResult exec(String key, RateLimitMethod method, Object... params){
53         try {
54             Long timestamp = getRedisTimestamp();
55             String[] allParams = new String[params.length + 2];
56             allParams[0] = method.name();
57             allParams[1] = timestamp.toString();
58             for(int index = 0;index < params.length; index++){
59                 allParams[2 + index] = params[index].toString();
60             }
61             Long result = redisTemplate.execute(rateLimitScript,
62                     Collections.singletonList(getKey(key)),
63                     allParams);
64             return RateLimitResult.getResult(result);
65         } catch (Exception e){
66             log.error("execute redis script fail, key:{}, method:{}",
67                     key, method.name(), e);
68             return RateLimitResult.ERROR;
69         }
70     }
71 
72     private Long getRedisTimestamp(){
73         Long currMillSecond = redisTemplate.execute(
74                 (RedisCallback<Long>) redisConnection -> redisConnection.time()
75         );
76         return currMillSecond;
77     }
78     private String getKey(String key){
79         return RATE_LIMIT_PREFIX + key;
80     }
81 }
令牌桶的组装方法

测试代码

Redis&Lua の令牌桶  -初体验
 1 /**
 2  *  测试代码
 3  **/
 4 @Component
 5 public class RateLimitTest {
 6 
 7     @Autowired
 8     private RateLimitClient rateLimitClient;
 9  
10      
11     public void testAcquire(String key)   {
12         RateLimitResult   result = rateLimitClient.acquire(key);
13         if  (result.getCode() == SUCCESS ){
14             System.out.println(" SUCCESS! ")
15         } else if (result.getCode() == NO_LIMIT ){
16         rateLimitClient.init(key,RateLimitVo.builder.interval(1.0).initialPermits(0).maxPermits(1).build());
17             System.out.println(" INIT! ")
18         } else {
19             System.out.println(" ERROR! ")
20         }        
21     }
22 }
简单-测试代码

 

结论

通过redis和lua,我实现了一个简单的分布式限流器。通过上述代码,大家能看到一个大致的实现框架,并且通过测试代码完成了验证。

上一篇:牛皮了!基于 Redis 的限流系统的设计!


下一篇:并发和多线程(十八)--CountDownLatch、Semaphore和CyclicBarrier源码解析