并发编程 限流器的实现

本文将介绍限流器的实现方式,并说明 Guava 对 RateLimiter 的实现。

一、限流

1. 为什么要限流?

限流的目的是限制并发量,以保证自身系统或下游系统不被巨型流量冲垮。

2. 常用限流方案

(1) 计数器算法

对请求数进行计数,超过请求数限制数则不允许通过,经过单位时间后清空计数。

缺点:

  • 在单位时间的开始时,可能会出现访问的峰值

    请求快速涌进来,快速用完请求限制,导致 “请求限制数” 个请求一同发出

(2) 漏桶算法

维护一个队列,队列有一定的容量,将请求逐个放入队列中(队列已满则丢弃或拒绝);另外有一个消费者,它将以固定速度从队列中读取请求并消费。

缺点:

  • 处理速度恒定,无法在流量较多时适当增加处理速度
  • 假如并发请求非常多,可能使队列满,进而导致请求丢失

(3) 令牌桶算法

维护一个令牌桶,令牌桶有一定的容量,生产者会以恒定速度生成令牌并放入令牌桶中(如果令牌桶已满,则丢弃或拒绝);所有请求都需要首先获取令牌才能执行,否则将被拒绝。

优点:

  • 与漏桶算法相比,允许一定的突发流量,同时又不会让突发流量过大

    突发流量数 = 令牌桶容量

二、什么是 RateLimiter?

1. 什么是 Guava?

Guava 是谷歌提供的 Java 类库,支持了集合、并发、IO、缓存等功能。

2. 什么是 RateLimiter?

RateLimiter 是 Guava 提供的工具类,是对限流器的实现。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 限流器流速:2个请求/秒
RateLimiter limiter = RateLimiter.create(2.0);
// 线程池
ExecutorService es = Executors.newFixedThreadPool(1);

while (true) {
// 限流器限流
limiter.acquire();
// 提交任务
es.execute(()->{
···
});
}

通过这个代码,可以使任务提交到线程池的时间间隔保持在半秒左右。

三、RateLimiter 的实现

1. 限流方案选择

RateLimiter 采用的是令牌桶算法。

2. 不可用的生产者 - 消费者模式

一个容易想到的是实现方式是生产者 - 消费者模式:一个生产者线程根据定时器定时添加令牌,请求者作为消费者获取令牌。

这种方式实际上是不可用的,原因是:

  • 在高并发场景下,系统的压力可能临近极限,此时定时器的误差将会非常大
  • 定时器本身会创建调度线程,会对系统性能产生影响

3. Guava 实现 - 简单无存储令牌桶

具体逻辑是:

  • 维护下一令牌产生时间
  • 如果申请令牌,
    • 如果”申请令牌的时间”晚于”下一令牌产生时间”:
      • 无需等待,直接获取令牌
      • 下一令牌产生时间 = 申请令牌的时间 + 时间间隔
    • 如果”申请令牌的时间”早于”下一令牌产生时间”:
      • 等待,等待时间为 下一令牌产生时间 - 申请令牌的时间
      • 下一令牌产生时间 = 下一令牌产生时间 + 时间间隔
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class SimpleRateLimiter {

/**
* 下一令牌产生时间
*/
long next = System.nanoTime();

/**
* 发放令牌时间间隔
*/
long interval = 1000_000_000;

/**
* 申请令牌
*/
public void acquire() {
// 申请令牌的时间
long now = System.nanoTime();
// 能够获取到令牌的时间
long at = reserve(now);
// 需要进行的等待时间
long waitTime = Math.max(at - now, 0);
// 等待
if (waitTime > 0) {
try {
TimeUnit.NANOSECONDS.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 预占据令牌
* @param now 申请令牌的时间
* @return 能够获取到令牌的时间
* 如果"申请令牌的时间"晚于"下一令牌产生时间":
* 1. 无需等待,直接获取令牌
* 2. `下一令牌产生时间 = 申请令牌的时间 + 时间间隔`
* 如果"申请令牌的时间"早于"下一令牌产生时间":
* 1. 需要等待,等待时间为 `下一令牌产生时间 -申请令牌的时间 `
* 2. `下一令牌产生时间 = 下一令牌产生时间 + 时间间隔`
*/
private synchronized long reserve(long now) {
if (now > next) {
next = now;
}
// 能够获取到令牌的时间
long at = next;
// 设置下一令牌产生时间
next += interval;
// 返回能够获取到令牌的时间
return at;
}

}

4. Guava 实现 - 令牌桶可存储

与上一实现的区别在于:

  • 允许令牌桶存储令牌

具体逻辑是:

  • 维护下一令牌产生时间
  • 维护令牌数
  • 如果申请令牌,
    • 维护令牌桶
      • 如果”申请令牌的时间”晚于”下一令牌产生时间”,重新计算令牌数,调整 “下一令牌产生时间”
    • 如果令牌桶中有剩余令牌,则直接获取,更新令牌数
    • 如果令牌桶中没有剩余令牌,则等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class SimpleRateLimiter2 {

/**
* 令牌数
*/
long storedPermits = 0;

/**
* 令牌桶的容量
*/
long maxPermits = 3;

/**
* 下一令牌产生时间
*/
long next = System.nanoTime();

/**
* 发放令牌时间间隔
*/
long interval = 1000_000_000;

/**
* 申请令牌
*/
public void acquire() {
// 申请令牌的时间
long now = System.nanoTime();
// 能够获取到令牌的时间
long at = reserve(now);
// 需要进行的等待时间
long waitTime = Math.max(at - now, 0);
// 等待
if (waitTime > 0) {
try {
TimeUnit.NANOSECONDS.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 预占据令牌
* @param now 申请令牌的时间
* @return 能够获取到令牌的时间
*/
private synchronized long reserve(long now){
reSync(now);
// 能够获取到令牌的时间
long at = next;
// 是否直接从令牌桶中获取令牌
boolean takeFromBucket = storedPermits != 0;
// 如果从令牌桶中获取令牌,则令牌数递减
// 否则,下一令牌被预占据,下一令牌产生时间延后
if (!takeFromBucket) {
next += interval;
} else {
storedPermits--;
}
return at;
}

/**
* 维护令牌桶
* 如果"申请令牌的时间"晚于"下一令牌产生时间":
* 1. 重新计算令牌数
* 2. 由于桶中已经有令牌,因此可直接获取,"下一令牌产生时间"重置为当前时间
*/
private void reSync(long now) {
if (now > next) {
// 重新计算令牌数
storedPermits = Math.max(maxPermits, storedPermits + (now - next) / interval);
// 下一令牌产生时间重置为当前时间
next = now;
}
}

}

参考