限流一般有漏桶和令牌桶两种实现,详情网上很多资料。 一般都会选用令牌桶算法,比较灵活,可以支持预热、容忍一定突发流量、预支一部分流量的弹性。
单进程限流 单个jvm限流有现成的库,谷歌的guava库提供了RateLimiter
,底层实现上有两个,一个是能容忍突发的实现,一个是能提供预热功能的实现;通过create时提供不同的参数来获得。
实现原理 朴素解法 如果按照令牌桶算法朴素的定义来实现的话,一个很自然的思路就是增加一个定时线程、进程,然后不断生成新的token;
朴素解法的缺点 重度依赖这个定时线程,如果定时程序挂了,所有工作线程就被卡住了,风险较大。 如果给定时线程加监控,又会需要监控的监控,那就变成俄罗斯套娃了。
guava库中的解法 需要访问者多输入一个参数: 当前时钟。lazy eval
: 每次取token的时候才计算当前”应该”有多少token。 基于时钟来计算: 当前时钟下, 过去了多久没有生成token,应该生成多少新的token。 核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 void resync (long nowMicros) { if (nowMicros > nextFreeTicketMicros) { storedPermits = min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros()); nextFreeTicketMicros = nowMicros; } }
guava库中解法的优点 开销更低: 去掉了定时线程的开销; 健壮性更高: 每个线程都可以承担生成新token的任务;
guava库中解法的缺点 每个线程的时钟得对齐。但这一点在单进程场景下很好保证。
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import com.google.common.util.concurrent.RateLimiter;private static void run1 () { RateLimiter limiter = RateLimiter.create(5 ); while (true ) { System.out.println("get 1 tokens: " + limiter.acquire() + "s" ); } } private static void run2 () { RateLimiter r = RateLimiter.create(2 , 3 , TimeUnit.SECONDS); while (true ) { System.out.println("get 1 tokens: " + r.acquire(1 ) + "s" ); System.out.println("get 1 tokens: " + r.acquire(1 ) + "s" ); System.out.println("get 1 tokens: " + r.acquire(1 ) + "s" ); System.out.println("get 1 tokens: " + r.acquire(1 ) + "s" ); System.out.println("end" ); } } private static void run3 () { RateLimiter r = RateLimiter.create(5 ); while (true ) { System.out.println("get 5 tokens: " + r.acquire(5 ) + "s" ); System.out.println("get 1 tokens: " + r.acquire(1 ) + "s" ); System.out.println("get 1 tokens: " + r.acquire(1 ) + "s" ); System.out.println("get 1 tokens: " + r.acquire(1 ) + "s" ); System.out.println("end" ); } }
分布式限流 如果是多实例的情况下,下游处理能力有上限时(例如物料有限),需要对整体有一个限流。 可以参考guava的实现,实现一个基于redis的,有一定弹性的令牌桶实现: (由于是借鉴guava的实现,所以有相同的依赖: 所有进程、机器的时钟对齐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void tokenLimitDemo () { DefaultRedisScript<Long> lua = new DefaultRedisScript<>(); lua.setResultType(Long.class ) ; lua.setScriptSource(new ResourceScriptSource(new ClassPathResource("tokenLimiter.lua" ))); List<String> keys = new ArrayList<>(); keys.add("test_ip" ); for (int i = 0 ; i < 100 ; i++) { Long res = (Long) redisTemplate.execute(lua, keys , "1" , String.valueOf(System.currentTimeMillis()) ); if (res != 1 ) { try { System.out.println("waiting" ); Thread.sleep(100 ); i--; } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(res); } } }
相应的lua脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 local need_token = tonumber (ARGV[1 ])local req_time = tonumber (ARGV[2 ])if type (need_token) ~= "number" or type (req_time) ~= "number" then return 0 end local key = KEYS[1 ]local info = redis.pcall ("HMGET" , key, "last_time" , "cur_token_num" , "max_token" , "rate" )local last_time = tonumber (info[1 ])local cur_token_num = tonumber (info[2 ])local max_token = tonumber (info[3 ])local rate = tonumber (info[4 ])if type (last_time) ~= "number" then last_time = req_time max_token = 2 cur_token_num = max_token rate = 1 redis.pcall ("HMSET" , key, "last_time" , req_time, "cur_token_num" , cur_token_num) redis.pcall ("HMSET" , key, "max_token" , max_token, "rate" , rate) end local result = 0 if tonumber (need_token) > tonumber (max_token) then result = 0 end local new_gen_token = math .floor ((req_time - last_time) / 1000 ) * ratelocal cur_token_num = math .min (cur_token_num + new_gen_token, max_token)if tonumber (need_token) > tonumber (cur_token_num) then result = 0 else cur_token_num = cur_token_num - need_token result = 1 redis.pcall ("HMSET" , key, "last_time" , req_time, "cur_token_num" , cur_token_num) end return result