分布式限流

1. 定义

分布式区别于单机限流的场景,它把整个分布式环境中所有服务当做一个整体来考量。比如说针对IP的限流,我们限制了1个IP每秒最多10个访问,不管来自这个IP的请求落在了哪台机器上,只要是访问了集群中的服务节点,那么都都会受到限流规则的制约。

从上面的例子不难看出,我们必须将限流信息保存在一个”中心化的组件上,这样它就可以获取到集群中所有机器的访问状态,目前前有两个比较主流的限流方案:

  • 网关层限流将限流规则应用在所有流量的入口处
  • 中间件限流 将限流信息存储在分布式环境中某个中间件里(比如Redis缓存),每个组件都可以从这里获取到当前时刻的流量统试,从而决定是拒绝服务还是放行流量

2. 常见的限流规则

  1. QPS和连接数控制

    • 针对连接数和QPS(querypersecond)限流来说, 我们可以设定IP维度的限流, 也可以设置基于单个服务器的限流。
    • 在真实环境中通常会设置多个维度的限流规则, 比如设定同一个IP每秒访问频率小于10, 连接数小于5,再设定每台机器QPS最高1000,连接数最大保持200。更进一步,我们可以把某个服务器组或整个机房的服务器当做一个整体, 设置更hign-level的限流规则, 这些所有限流规则都会共同作用于洗量控制。
  2. 传输速率

    • 对于”传输速率”大家都不会陌生, 比如资源的下载速度。有的网站在这方面的限流逻辑做的更细致,比如普通注册用户下载速度为100k/s,购买会员后是10M/S, 这背后就是基于用户组或者用户标签的限流逻辑。
  3. 黑白名单

    • 黑白名单是各个大型企业应用里很常见的限流和放行手段, 而且黑白名单往往是动态变化的。举个例子,如果某个IP在一段时间的的访问次数过于频繁,被系统识别为机器人用户或流量攻击,那么这个IP就会被加入到黑名单,从而限制其对系统资源的访问,这就是我们俗称的”封IP”。

      我们平时见到的爬虫程序,比如说爬知乎上的美女图片,或者爬券商系统的股票分时信息,这类爬虫程序都必须实现更换IP的功能,以防被加入黑名单。有时我们还会发现公司的网络无法访问12306这类大型公共网站, 这也是因为了某些公司的出网IP是同一个地址, 因此在访问量过高的情况下,这个IP地址就被对方系统识别,进而被添加到了黑名单。使用家庭宽带的同学们应该知道,大部分网络运营商都会将用户分配到不同出网IP段,或者时不时动态更换用户的IP地址。

    • 白名单就更好理解了,相当于御赐金牌在身,可以自由穿梭在各种限流规则里,畅行无阻。比如某些电商公司会将超大卖家的账号加入白名单,因为这类卖家往往有自己的一套运维系统,需要对接公司的IT系统做大量的商品发布、补货等等操作。

同学们肯定发现了这样一种情况,在春运抢票的时候,当你面对这么一堆验证码图片,不管你怎么选,即使你用毕生所学学选出了正确答案,提交后依然都会被告知你选 错了。要么就是让你面对一堆鬼都看不出是什么东西的图片。不要不疑自己的智商,其实,这就是网站的一种别样的限流措施。在拷问河用户智商的同时,通过这种”故 意”刁难的手段,光明正大地限制访问流量,从而大幅降低系统的访问压力,真不得不敬佩产品经理的智(良)慧(心)。

3. 限流实现方案

1. guava RateLimiter 客户端限流

1. 定义

Guava 的 RateLimiter 是一个轻量级的客户端限流工具,适用于单机环境下的流量控制。它基于令牌桶算法实现,能够平滑地限制请求速率。

核心特性:

  • 支持固定的生成速率(QPS)。
  • 支持突发请求(允许短时间内超出速率限制)。
  • 非阻塞式和阻塞式两种获取令牌的方式。

2. 原理

令牌桶算法:系统以固定速率生成令牌,每个请求需要消耗一个令牌。如果没有足够的令牌,则请求被阻塞或拒绝。

3. 使用

  1. 引入依赖
1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
  1. 代码实战
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Controller {
RateLimiter limiter = RateLimiter.create(2.0);

//场景一:非阻塞限流
@GetMapping("/tryAcquire")
public String tryAcquire(Integer count) {
if(limiter.tryAcquire(count)){ // count:一次占用几个令牌
log.info("success, rate is {}", limiter.getRate());
return "success";
} else {
log.info("fail, rate is {}", limiter.getRate());
return "fail";
}
}

//场景二:限定时间的非阻塞限流, 如:看在指定时间内,能否满足我的调用请求
@GetMapping("/tryAcquireWithTimeout")
public String tryAcquireWithTimeout(Integer countInteger timeout){
if (limiter.tryAcquire(count, timeout, TimeUnit.SSECONDS))
log.info("success, rate is {}", limiter.getRate())
return "success";
} else {
log.info("fail, rate is {}", limiter.getRate());
return "fail";
}
}


//场景三:同步阻塞限流。能获取就获取,获取不到就阻塞
@GetMapping("/acquire")
public String acquire(Integer count) {
limiter.acquire(count);
log.info("success, rate is {}", limiter.getRate());
return "success";
}
}

2. 基于Nginx限流

1. 配置单机限流(类似IP限流)

Nginx 提供了内置的限流模块,可以在网关层实现分布式限流。

1. 优点

  • 简单高效,适合网关层限流。
  • 不需要修改后端业务代码。

2. 实现步骤

  1. 修改nginx.conf
1
2
3
4
5
6
7
8
9
10
limit_req_zone $binary_remote_addr zone=iplimit:20m rate=1r/s;

server {
server_name www.imooc-training.com;
location /access-limit/ {
proxy_pass http://127.0.0.1:10086/;
limit_req zone=iplimit burst=2 nodelay;
limit_req_status 504;
}
}
  • limit_req_zone $binary_remote_addr zone=iplimit:20m rate=1r/s;

    • $binary_remote_addr : binary_目的是缩写内存占用,remote_addr表示通过IP地址来限流

    • zone=iplimit:20m: iplimit是一块内存区域(记录访问频率信息),20m是指这块内有存区域的大小为20Mb

    • rate=1r/s.: 比如100r/m,标识访问的限流频率

  • limit_req zone=iplimit burst=2 nodelay;

    • zone=iplimit: 引用limit_req_zone中的zone变量
    • burst=2,设置一个大小为2的缓冲区域,当大量请求到来。请求数量超过限流频率时,将其放入缓冲区域
    • nodelay=>缓冲区满了以后,直接返回503异常
  • limit_req_status 504

    • 异常情况,返回504(默认是503)
  1. 测试controller
1
2
3
4
5
@GetMapping("/nginx")
public String nginx(){
log.info("Nginx success");
return "success";
}

2. 基于服务器级别限流

不管什么IP加在一起不能超过某个值,通常情况下,server级别的限流速率是最大的。

1. 实现步骤

  1. 改niginx.conf
1
2
limit_req_zone $server_name zone=serverlimit:10m rate=1r/s;
limit_req zone=serverlimit burst=1 nodeelay;

3. 基于连接数

当前active的连接数

1. 实现步骤

  1. 改niginx.conf
1
2
3
4
5
6
#每个server最多保持100个连接
limit_conn perserver 100;

limit_conn_zone $binary_remote_addr zone=perip:20m;
#每个IP地址最多保持1个连接
limit_conn perip 1;
  1. 测试
1
2
3
4
5
6
7
8
@GetMapping("/nginx-conn")
public String nginxConn(@RequestParam(defaultValue = "0") int secs) {
try {
Thread.sleep( millis: 1000 * secs);
} catch (Exception e) {
}
return "success";
}

3. redis+Lua

1. 定义

利用Redis原子操作,通过Lua脚本实现令牌生成与获取,避免并发问题。

2. 实现步骤

  1. 编写Lua限流脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 获取方法签名特征
local methodKey = KEYS[1]
redis.log(redis.LOG_DEBUG, 'key is', methodKey)

-- 调用脚本传入的限流大小
local limit = tonumber(ARGV[1])

-- 获取当前流量大小
local count = tonumber(redis.call('get', methodKey) or "0")

-- 是否超出限流阈值
if count + 1 > limit then
-- 拒绝服务访问
return false
else
-- 没有超过阈值
-- 设置当前访问的数量+1
redis.call("INCRBY", methodKey, 1)
-- 设置过期时间
redis.call("EXPIRE", methodKey, 1)
-- 放行
return true
end
  1. Java 集成Lua和Redis
1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.imooc.springcloud;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

/**
* Created by 半仙.
*/
@Configuration
public class RedisConfiguration {

// 如果本地也配置了StringRedisTemplate,可能会产生冲突
// 可以指定@Primary,或者指定加载特定的@Qualifier
@Bean
public RedisTemplate<String, String> redisTemplate(
RedisConnectionFactory factory) {
return new StringRedisTemplate(factory);
}

@Bean
public DefaultRedisScript loadRedisScript() {
DefaultRedisScript redisScript = new DefaultRedisScript();
redisScript.setLocation(new ClassPathResource("ratelimiter.lua"));
redisScript.setResultType(java.lang.Boolean.class);
return redisScript;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.imooc.springcloud.annotation;

import java.lang.annotation.*;

/**
* Created by 半仙.
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AccessLimiter {

int limit();

String methodKey() default "";

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.imooc.springcloud.annotation;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.stream.Collectors;

/**
* Created by 半仙.
*/
@Slf4j
@Aspect
@Component
public class AccessLimiterAspect {

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Autowired
private RedisScript<Boolean> rateLimitLua;

@Pointcut("@annotation(com.imooc.springcloud.annotation.AccessLimiter)")
public void cut() {
log.info("cut");
}

@Before("cut()")
public void before(JoinPoint joinPoint) {
// 1. 获得方法签名,作为method Key
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();

AccessLimiter annotation = method.getAnnotation(AccessLimiter.class);
if (annotation == null) {
return;
}

String key = annotation.methodKey();
Integer limit = annotation.limit();

// 如果没设置methodkey, 从调用方法签名生成自动一个key
if (StringUtils.isEmpty(key)) {
Class[] type = method.getParameterTypes();
key = method.getClass() + method.getName();

if (type != null) {
String paramTypes = Arrays.stream(type)
.map(Class::getName)
.collect(Collectors.joining(","));
log.info("param types: " + paramTypes);
key += "#" + paramTypes;
}
}

// 2. 调用Redis
boolean acquired = stringRedisTemplate.execute(
rateLimitLua, // Lua script的真身
Lists.newArrayList(key), // Lua脚本中的Key列表
limit.toString() // Lua脚本Value列表
);

if (!acquired) {
log.error("your access is blocked, key={}", key);
throw new RuntimeException("Your access is blocked");
}
}

}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.imooc.springcloud;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* Created by 半仙.
*/
@RestController
@Slf4j
public class Controller {

@Autowired
private AccessLimiter accessLimiter;

@GetMapping("test")
public String test() {
accessLimiter.limitAccess("ratelimiter-test", 3);
return "success";
}

// 提醒! 注意配置扫包路径(com.imooc.springcloud路径不同)
@GetMapping("test-annotation")
@com.imooc.springcloud.annotation.AccessLimiter(limit = 1)
public String testAnnotation() {
return "success";
}

}

分布式限流
http://example.com/分布式限流/
作者
Panyurou
发布于
2025年4月15日
许可协议