限流器
令牌桶
基本概念
令牌桶常用的限流算法之一,用于保护自己的系统,主要用来对调用者频率进行限流,为的是不让自己的系统被打垮。
漏桶算法,用来保护他人,也就是保护他所调用的系统。主要场景,当调用的第三方系统本身没有保护机制,或者有流量限制的时候,我们调用的速度不能超过他的限制,在主调方控制。
如果保护自己的系统不被打垮,用令牌桶。如果保证别人的系统不被打垮,漏桶算法。
今天主要讲令牌桶算法。令牌桶主要处理逻辑结构示意图如下
Guava 采用的是令牌桶算法,其核心是要想通过限流器,必须拿到令牌。也就是说,只要我们能够限制发放令牌的速率,那么就能控制流速了。令牌桶算法的详细描述如下:
- 令牌以固定的速率添加到令牌桶中,假设限流的速率是 r/ 秒,则令牌每 1/r 秒会添加一个;
- 假设令牌桶的容量是 b ,如果令牌桶已满,则新的令牌会被丢弃;
- 请求能够通过限流器的前提是令牌桶中有令牌。
Guava 令牌桶算法
基本实现
令牌桶这个算法,如何用 Java 实现呢?很可能你的直觉会告诉你生产者 - 消费者模式:一个生产者线程定时向阻塞队列中添加令牌,而试图通过限流器的线程则作为消费者线程,只有从阻塞队列中获取到令牌,才允许通过限流器。
这个算法看上去非常完美,而且实现起来非常简单,如果并发量不大,这个实现并没有什么问题。可实际情况却是使用限流的场景大部分都是高并发场景,而且系统压力已经临近极限了,此时这个实现就有问题了。问题就出在定时器上,在高并发场景下,当系统压力已经临近极限的时候,定时器的精度误差会非常大,同时定时器本身会创建调度线程,也会对系统的性能产生影响。
Guava 实现令牌桶算法,用了一个很简单的办法,其关键是记录并动态计算下一令牌发放的时间。
抢占令牌时重新计算令牌桶中的令牌数量。如果当前时间到了下一次产生令牌的时间则,更新下一次产生令牌时间,并将令牌数量增加。简易的代码如下
class SimpleLimiter {
//当前令牌桶中的令牌数量
long storedPermits = 0;
//令牌桶的容量
long maxPermits = 3;
//下一令牌产生时间
long next = System.nanoTime();
//发放令牌间隔:纳秒
long interval = 1000_000_000;
//请求时间在下一令牌产生时间之后,则
// 1.重新计算令牌桶中的令牌数
// 2.将下一个令牌发放时间重置为当前时间
void resync(long now) {
if (now > next) {
//新产生的令牌数
long newPermits=(now-next)/interval;
//新令牌增加到令牌桶
storedPermits=min(maxPermits,
storedPermits + newPermits);
//将下一个令牌发放时间重置为当前时间
next = now;
}
}
//预占令牌,返回能够获取令牌的时间
synchronized long reserve(long now){
resync(now);
//能够获取令牌的时间
long at = next;
//令牌桶中能提供的令牌
long fb=min(1, storedPermits);
//令牌净需求:首先减掉令牌桶中的令牌
long nr = 1 - fb;
//重新计算下一令牌产生时间
next = next + nr*interval;
//重新计算令牌桶中的令牌
this.storedPermits -= fb;
return at;
}
//申请令牌
void acquire() {
//申请令牌时的时间
long now = System.nanoTime();
//预占令牌
long at=reserve(now);
long waitTime=max(at-now, 0);
//按照条件等待
if(waitTime > 0) {
try {
TimeUnit.NANOSECONDS
.sleep(waitTime);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
很精髓的就是reserve方法,稍微解释一下
首先肯定是计算令牌桶里面的令牌数量
然后取令牌桶中的令牌数量storedPermits 与当前的需要的令牌数量 1 做比较,大于等于 1,说明令牌桶至少有一个令牌,此时下一令牌的获取是不需要等待的,表现为 next 不需要变化;而当令牌桶中的令牌没有了即storedPermits等于 0 时,next 就会变化为下一个令牌的获取时间,注意 nr 的值变化。
平滑预热限流
Guava还提供了平滑预热功能。当流量突然增大的时候,我们常常会希望系统从空闲状态到繁忙状态的切换的时间长一些。即如果系统在此之前长期处于空闲的状态,我们希望处理请求的数量是缓步的增多,经过预期的时间以后,到达系统处理请求个数的最大值。Warm Up(冷启动,预热)模式就是为了实现这个目的的。
SmoothWarmingUp
实现预热缓冲的关键在于其分发令牌的速率会随时间和令牌数而改变,速率会先慢后快。令牌刷新的时间间隔由长逐渐变短。等存储令牌数从maxPermits到达thresholdPermits时,发放令牌的时间价格也由coldInterval降低到了正常的stableInterval。
This implements the following function where coldInterval = coldFactor * stableInterval.
^ throttling
|
cold + /
interval | /.
| / .
| / . ← "warmup period" is the area of the trapezoid between
| / . thresholdPermits and maxPermits
| / .
| / .
| / .
stable +----------/ WARM .
interval | . UP .
| . PERIOD.
| . .
0 +----------+-------+--------------→ storedPermits
0 thresholdPermits maxPermits
Before going into the details of this particular function, let's keep in mind the basics:
1. The state of the RateLimiter (storedPermits) is a vertical line in this figure.
2. When the RateLimiter is not used, this goes right (up to maxPermits)
3. When the RateLimiter is used, this goes left (down to zero), since if we have storedPermits, we serve from those first
4. When _unused_, we go right at a constant rate! The rate at which we move to the right is chosen as maxPermits / warmupPeriod. This ensures that the time it takes to go from 0 to maxPermits is equal to warmupPeriod.
5. When _used_, the time it takes, as explained in the introductory class note, is equal to the integral of our function, between X permits and X-K permits, assuming we want to spend K saved permits.
实际运用
最近在编写一个限流的组件,其中变使用Guava提供的令牌桶。会为每一个接口创造一个限流的对象,具体对象封装如下。其中的使用的是Guava提供的平滑预热的RateLimiter。主要调用接口在调用前都会尝试调用tryAcquire()方法。
public class TokenBucketAlg implements RateLimitAlg{
/**
* guava的令牌桶限流算法
*/
private RateLimiter rateLimiter;
/**
* 创建一个多少秒内最多多少次请求的令牌桶
* 预热模式
* @param limit 次数
* @param unit 多少秒
*/
public TokenBucketAlg(int limit, int unit){
rateLimiter = RateLimiter.create(limit/(double)unit, unit, TimeUnit.SECONDS);
}
/**
* 每秒钟最多能有多少次访问
* @param limit
*/
public TokenBucketAlg(int limit){
rateLimiter = RateLimiter.create(limit, 10, TimeUnit.SECONDS);
}
@Override
public boolean tryAcquire() throws RuntimeException {
// 增加等待时间,以面对突然性qps增高了,而只是临时没有了令牌
if(rateLimiter.tryAcquire(1, 100000, MICROSECONDS)){
return true;
}
throw new LimitException("此接口被限流,此次访问失败");
}
}
看了trateLimiter.tryAcquire(1, 100000, MICROSECONDS) 后你可能会想为什么要尝试这个等待时间内能不能获取到令牌。根据前面所说的令牌桶实现。假设限流的速率是 r/ 秒,则令牌每 1/r 秒会添加一个。但如果1/r秒来了2个则有一个就会失败,而其后可能不会有请求进来。这就会导致本来应该处理的请求直接被拒接了。
如果直接使用trateLimiter.tryAcquire()经过压测也确实如此我设置的每秒50的访问,一压发现根本到不了,50并发的压测被限流的多达30%。于是我调用了其拥有等待时间的接口。
关于这个接口你可能会觉得,是不是会把调用线程一直阻塞住。这样就照样占用了tomcat的线程资源。
一开始我也害怕会这样,因为这样的话限流相当于没有限制住。源码之下无秘密
tryAcquire
/**
* Acquires the given number of permits from this {@code RateLimiter} if it can be obtained
* without exceeding the specified {@code timeout}, or returns {@code false} immediately (without
* waiting) if the permits would not have been granted before the timeout expired.
*
* @param permits the number of permits to acquire
* @param timeout the maximum time to wait for the permits. Negative values are treated as zero.
* @param unit the time unit of the timeout argument
* @return {@code true} if the permits were acquired, {@code false} otherwise
* @throws IllegalArgumentException if the requested number of permits is negative or zero
*/
@SuppressWarnings("GoodTime") // should accept a java.time.Duration
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
// 主要逻辑之一 尝试在timeout时间段内能不能获取到令牌
if (!canAcquire(nowMicros, timeoutMicros)) {
return false;
} else {
//能获取到则 重置一些数据
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
// 将调用者休眠microsToWait
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
/**
* Returns the earliest time that permits are available (with one caveat).
*
* @return the time that permits are available, or, if permits are available immediately, an
* arbitrary past or present time
*/
@Override
final long queryEarliestAvailable(long nowMicros) {
// 获取能得到对应数量的令牌的时间
return nextFreeTicketMicros;
}
/**
* Reserves the requested number of permits and returns the time that those permits can be used
* (with one caveat).
*
* @return the time that the permits may be used, or, if the permits may be used immediately, an
* arbitrary past or present time
*/
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
canAcquire(nowMicros, timeoutMicros)方法,很重要。尝试在timeout时间段内能不能获取到令牌,如果在timeout里面获取不到则直接返回。
reserveEarliestAvailable 重置了nextFreeTicketMicros,更新了令牌桶令牌的数量。
**通过源码分析,你可以知道他会判断在timeOut时间内能不能获取到令牌,获取不到则直接返回了。如果再timeout时间内能获取到,但是数量不足则会休眠到令牌数量的时间(因为产生令牌速率固定,于是时间也是可计算的)。再执行。**于是也就避免了我之前害怕的场景,且使得伸缩性也强一些。
通过压测也确实到达了我想要的效果。