前言
本文是在java语言场景下,基于Redis内存数据库和Lua脚本技术实现的令牌桶方案。需要看官了解简单的上述名词的含义。
引用
https://www.jianshu.com/p/c8cee1507ccc?from=singlemessage
名词解释
案例
日常业务中,服务调用角度,pc端使用与app端使用频次有明显的区别。一方面,为了服务的稳定性;另一方面,业务场景中不必要的高频次使用并不合理。由此,部分业务场景衍生出了一些在服务接口层面的卡口的需求,类同腾讯的IM在接口使用上的限制。eg:接口的并发请求频次、亦或是单个窗口期内的使用次数的限制等。
在有类似的业务需求的同时,参照了市面上,比较合适(易上手,易维护,易传达等)的技术方案,想到了,基于Redis内存数据库和lua脚本的令牌桶方案,也因此想要记录一篇本次使用记录。
下面开始上代码!!!
引入依赖
1 <dependencies> 2 <dependency> 3 <groupId>org.springframework.boot</groupId> 4 <artifactId>spring-boot-starter-web</artifactId> 5 </dependency> 6 <dependency> 7 <groupId>org.springframework.boot</groupId> 8 <artifactId>spring-boot-starter-data-redis</artifactId> 9 </dependency> 10 <dependency> 11 <groupId>org.springframework.boot</groupId> 12 <artifactId>spring-boot-starter-aop</artifactId> 13 </dependency> 14 <dependency> 15 <groupId>org.apache.commons</groupId> 16 <artifactId>commons-lang3</artifactId> 17 </dependency> 18 <dependency> 19 <groupId>org.springframework.boot</groupId> 20 <artifactId>spring-boot-starter-test</artifactId> 21 </dependency> 22 </dependencies>依赖
Redis配置
1 spring.application.name=spring-boot-limit 2 3 # Redis数据库索引 4 spring.redis.database=0 5 # Redis服务器地址 6 spring.redis.host=10.4.89.161 7 # Redis服务器连接端口 8 spring.redis.port=6379 9 # Redis服务器连接密码(默认为空) 10 spring.redis.password= 11 # 连接池最大连接数(使用负值表示没有限制) 12 spring.redis.jedis.pool.max-active=8 13 # 连接池最大阻塞等待时间(使用负值表示没有限制) 14 spring.redis.jedis.pool.max-wait=-1 15 # 连接池中的最大空闲连接 16 spring.redis.jedis.pool.max-idle=8 17 # 连接池中的最小空闲连接 18 spring.redis.jedis.pool.min-idle=0 19 # 连接超时时间(毫秒) 20 spring.redis.timeout=10000Redis配置
Lua 脚本
1 -- 返回码 1:操作成功 0:未配置 -1: 获取失败 -2:修改错误,建议重新初始化 -500:不支持的操作 2 -- redis hashmap 中存放的内容: 3 -- last_mill_second 上次放入令牌或者初始化的时间 4 -- stored_permits 目前令牌桶中的令牌数量 5 -- max_permits 令牌桶容量 6 -- interval 放令牌间隔 7 -- app 一个标志位,表示对于当前key有没有限流存在 8 9 local SUCCESS = 1 10 local NO_LIMIT = 0 11 local ACQUIRE_FAIL = -1 12 local MODIFY_ERROR = -2 13 local UNSUPPORT_METHOD = -500 14 15 local ratelimit_info = redis.pcall("HMGET",KEYS[1], "last_mill_second", "stored_permits", "max_permits", "interval", "app") 16 local last_mill_second = ratelimit_info[1] 17 local stored_permits = tonumber(ratelimit_info[2]) 18 local max_permits = tonumber(ratelimit_info[3]) 19 local interval = tonumber(ratelimit_info[4]) 20 local app = ratelimit_info[5] 21 22 local method = ARGV[1] 23 24 --获取当前毫秒 25 --考虑主从策略和脚本回放机制,这个time由客户端获取传入 26 --local curr_time_arr = redis.call('TIME') 27 --local curr_timestamp = curr_time_arr[1] * 1000 + curr_time_arr[2]/1000 28 local curr_timestamp = tonumber(ARGV[2]) 29 30 31 -- 当前方法为初始化 32 if method == 'init' then 33 --如果app不为null说明已经初始化过,不要重复初始化 34 if(type(app) ~='boolean' and app ~=nil) then 35 return SUCCESS 36 end 37 38 redis.pcall("HMSET", KEYS[1], 39 "last_mill_second", curr_timestamp, 40 "stored_permits", ARGV[3], 41 "max_permits", ARGV[4], 42 "interval", ARGV[5], 43 "app", ARGV[6]) 44 --始终返回成功 45 return SUCCESS 46 end 47 48 -- 当前方法为修改配置 49 if method == "modify" then 50 if(type(app) =='boolean' or app ==nil) then 51 return MODIFY_ERROR 52 end 53 --只能修改max_permits和interval 54 redis.pcall("HMSET", KEYS[1], 55 "max_permits", ARGV[3], 56 "interval", ARGV[4]) 57 58 return SUCCESS 59 60 end 61 62 -- 当前方法为删除 63 if method == "delete" then 64 --已经清除完毕 65 if(type(app) =='boolean' or app ==nil) then 66 return SUCCESS 67 end 68 redis.pcall("DEL", KEYS[1]) 69 return SUCCESS 70 end 71 72 -- 尝试获取permits 73 if method == "acquire" then 74 -- 如果app为null说明没有对这个进行任何配置,返回0代表不限流 75 if(type(app) =='boolean' or app ==nil) then 76 return NO_LIMIT 77 end 78 --需要获取令牌数量 79 local acquire_permits = tonumber(ARGV[3]) 80 --计算上一次放令牌到现在的时间间隔中,一共应该放入多少令牌 81 local reserve_permits = math.max(0, math.floor((curr_timestamp - last_mill_second) / interval)) 82 83 local new_permits = math.min(max_permits, stored_permits + reserve_permits) 84 local result = ACQUIRE_FAIL 85 --如果桶中令牌数量够则放行 86 if new_permits >= acquire_permits then 87 result = SUCCESS 88 new_permits = new_permits - acquire_permits 89 end 90 --更新当前桶中的令牌数量 91 redis.pcall("HSET", KEYS[1], "stored_permits", new_permits) 92 --如果这次有放入令牌,则更新时间 93 if reserve_permits > 0 then 94 redis.pcall("HSET", KEYS[1], "last_mill_second", curr_timestamp) 95 end 96 return result 97 end 98 99 100 return UNSUPPORT_METHODLua脚本
redis的lua脚本读取配置:
1 @Bean("rateLimitLua") 2 public DefaultRedisScript<Long> getRateLimitScript() { 3 DefaultRedisScript<Long> rateLimitLua = new DefaultRedisScript<>(); 4 rateLimitLua.setLocation(new ClassPathResource("scripts/rate_limit.lua")); 5 rateLimitLua.setResultType(Long.class); 6 return rateLimitLua; 7 }脚本读取
适配lua的枚举类
1 /** 2 * 3 * 4 * 限流的具体方法 5 */ 6 public enum RateLimitMethod { 7 8 //initialize rate limiter 9 init, 10 11 //modify rate limiter parameter 12 modify, 13 14 //delete rate limiter 15 delete, 16 17 //acquire permits 18 acquire; 19 }限流的具体方法
1 /** 2 * 3 * 操作redis的结果标识类 4 **/ 5 public enum RateLimitResult { 6 7 SUCCESS(1L), 8 NO_LIMIT(0L), 9 ACQUIRE_FAIL(-1L), 10 MODIFY_ERROR(-2L), 11 UNSUPPORT_METHOD(-500L), 12 ERROR(-505L); 13 14 @Getter 15 private Long code; 16 17 RateLimitResult(Long code){ 18 this.code = code; 19 } 20 21 public static RateLimitResult getResult(Long code){ 22 for(RateLimitResult enums: RateLimitResult.values()){ 23 if(enums.code.equals(code)){ 24 return enums; 25 } 26 } 27 throw new IllegalArgumentException("unknown rate limit return code:" + code); 28 } 29 }操作redis的结果标识类
1 /** 2 * 令牌桶参数对象 3 **/ 4 @Data 5 @Builder 6 public class RateLimitVo { 7 /** 8 *是否生效 false-不生效-默认 true-生效 9 **/ 10 private boolean isLimit; 11 /** 12 *生产令牌间隔,单位-毫秒 13 **/ 14 private Double interval; 15 /** 16 *令牌桶内令牌最大数量 17 **/ 18 private Integer maxPermits; 19 /** 20 *初始化令牌桶内的数量 21 **/ 22 private Integer initialPermits; 23 24 }令牌桶参数对象
1 /** 2 * 令牌桶的组装方法 3 **/ 4 @Service 5 @Slf4j 6 public class RateLimitClient { 7 /** 8 *redis内key的前缀 9 **/ 10 private static final String RATE_LIMIT_PREFIX = "rate_limit:"; 11 12 @Autowired 13 StringRedisTemplate redisTemplate; 14 15 @Resource 16 @Qualifier("rateLimitLua") 17 RedisScript<Long> rateLimitScript; 18 19 public RateLimitResult init(String key, RateLimitVo rateLimitInfo){ 20 return exec(key, RateLimitMethod.init, 21 rateLimitInfo.getInitialPermits(), 22 rateLimitInfo.getMaxPermits(), 23 rateLimitInfo.getInterval(), 24 key); 25 } 26 27 public RateLimitResult modify(String key, RateLimitVo rateLimitInfo){ 28 return exec(key, RateLimitMethod.modify, key, 29 rateLimitInfo.getMaxPermits(), 30 rateLimitInfo.getInterval()); 31 } 32 33 public RateLimitResult delete(String key){ 34 return exec(key, RateLimitMethod.delete); 35 } 36 37 public RateLimitResult acquire(String key){ 38 return acquire(key, 1); 39 } 40 41 public RateLimitResult acquire(String key, Integer permits){ 42 return exec(key, RateLimitMethod.acquire, permits); 43 } 44 45 /** 46 * 执行redis的具体方法,限制method,保证没有其他的东西进来 47 * @param key 48 * @param method 49 * @param params 50 * @return 51 */ 52 private RateLimitResult exec(String key, RateLimitMethod method, Object... params){ 53 try { 54 Long timestamp = getRedisTimestamp(); 55 String[] allParams = new String[params.length + 2]; 56 allParams[0] = method.name(); 57 allParams[1] = timestamp.toString(); 58 for(int index = 0;index < params.length; index++){ 59 allParams[2 + index] = params[index].toString(); 60 } 61 Long result = redisTemplate.execute(rateLimitScript, 62 Collections.singletonList(getKey(key)), 63 allParams); 64 return RateLimitResult.getResult(result); 65 } catch (Exception e){ 66 log.error("execute redis script fail, key:{}, method:{}", 67 key, method.name(), e); 68 return RateLimitResult.ERROR; 69 } 70 } 71 72 private Long getRedisTimestamp(){ 73 Long currMillSecond = redisTemplate.execute( 74 (RedisCallback<Long>) redisConnection -> redisConnection.time() 75 ); 76 return currMillSecond; 77 } 78 private String getKey(String key){ 79 return RATE_LIMIT_PREFIX + key; 80 } 81 }令牌桶的组装方法
测试代码
1 /** 2 * 测试代码 3 **/ 4 @Component 5 public class RateLimitTest { 6 7 @Autowired 8 private RateLimitClient rateLimitClient; 9 10 11 public void testAcquire(String key) { 12 RateLimitResult result = rateLimitClient.acquire(key); 13 if (result.getCode() == SUCCESS ){ 14 System.out.println(" SUCCESS! ") 15 } else if (result.getCode() == NO_LIMIT ){ 16 rateLimitClient.init(key,RateLimitVo.builder.interval(1.0).initialPermits(0).maxPermits(1).build()); 17 System.out.println(" INIT! ") 18 } else { 19 System.out.println(" ERROR! ") 20 } 21 } 22 }简单-测试代码
结论
通过redis和lua,我实现了一个简单的分布式限流器。通过上述代码,大家能看到一个大致的实现框架,并且通过测试代码完成了验证。