在分布式系统中,亿级流量的冲击从来都不是小概率事件——大促峰值、热点事件、爬虫攻击、下游故障连锁反应,任何一个环节的失控,都可能引发全链路雪崩,最终导致核心业务不可用。流量治理的核心价值,就是在不可控的流量波动中,为系统构建一套可控的防护体系,保障核心业务的持续可用。
一、流量治理三剑客的本质与边界
很多开发者在落地流量治理时,会混淆限流、熔断、降级的适用场景,最终导致防护失效。三者的核心定位、触发时机、解决的核心问题完全不同,我们先明确三者的核心边界,从根源上避免概念混淆:
- 限流:事前防护,核心是管控进入系统的请求速率与总量,确保流量不超过系统的承载上限,解决的是“流量过多”的问题,防护对象是系统自身。
- 熔断:事中止损,核心是当依赖的下游服务出现故障时,主动切断调用链路,防止故障向上游蔓延引发雪崩,解决的是“依赖故障”的问题,防护对象是下游依赖。
- 降级:事后兜底,核心是当系统负载达到阈值时,主动关闭非核心功能,将有限的系统资源倾斜给核心业务,解决的是“资源不足”的问题,防护对象是核心业务。
三者并非替代关系,而是互补的分层防护体系,共同构成了分布式系统的流量治理护城河。
二、限流:入口流量的精准管控
2.1 限流的底层逻辑
任何系统的处理能力都是有上限的,这个上限由CPU、内存、IO、数据库连接数等硬件和软件资源共同决定。限流的本质,就是通过一套精准的流量管控规则,让进入系统的请求速率始终维持在系统承载上限之内,同时对超出阈值的请求执行快速拒绝,避免系统被突发流量打垮。
2.2 主流限流算法与实战实现
2.2.1 固定窗口计数器算法
核心原理:将时间划分为固定大小的窗口(如1秒),每个窗口内维护一个计数器,每进入一个请求计数器+1,当计数器达到阈值时,拒绝该窗口内的后续请求,窗口结束后计数器清零。核心特点:实现简单、内存占用极小;存在临界突刺缺陷,比如1秒窗口、阈值100的场景下,0.9秒-1.1秒之间可能通过200个请求,超出系统承载上限。适用场景:对流量精度要求不高的基础防护场景。
package com.jam.demo.limit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 固定窗口计数器限流实现
*
* @author ken
*/
@Slf4j
public class FixedWindowRateLimiter {
/**
* 时间窗口大小,单位毫秒
*/
private final long windowSize;
/**
* 窗口内最大请求数
*/
private final int maxCount;
/**
* 当前窗口计数器
*/
private final AtomicInteger counter = new AtomicInteger(0);
/**
* 当前窗口开始时间
*/
private volatile long windowStartTime;
public FixedWindowRateLimiter(long windowSize, int maxCount) {
if (windowSize <= 0 || maxCount <= 0) {
throw new IllegalArgumentException("窗口大小和最大请求数必须大于0");
}
this.windowSize = windowSize;
this.maxCount = maxCount;
this.windowStartTime = System.currentTimeMillis();
}
/**
* 尝试获取令牌,判断请求是否允许通过
*
* @return true-允许通过,false-触发限流
*/
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
// 超出当前窗口,重置窗口和计数器
if (currentTime - windowStartTime > windowSize) {
windowStartTime = currentTime;
counter.set(0);
}
// 计数器未达阈值,允许通过
if (counter.get() < maxCount) {
counter.incrementAndGet();
return true;
}
// 触发限流
log.warn("固定窗口限流触发,窗口内请求数已达上限:{}", maxCount);
return false;
}
}
2.2.2 滑动窗口计数器算法
核心原理:将固定窗口拆分为多个更小的时间格子(如1秒窗口拆分为10个100ms的格子),每次请求到来时,只统计当前时间往前推一个窗口周期内的所有格子的请求总数,同时自动清空过期的时间格子。彻底解决了固定窗口的临界突刺问题。核心特点:流量统计精度高,无临界突刺缺陷;实现复杂度略高于固定窗口,内存占用稍高。适用场景:绝大多数单机限流场景,对流量精度要求较高的业务。
package com.jam.demo.limit;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 滑动窗口计数器限流实现
*
* @author ken
*/
@Slf4j
public class SlidingWindowRateLimiter {
/**
* 窗口总大小,单位毫秒
*/
private final long windowSize;
/**
* 每个小格子的大小,单位毫秒
*/
private final long gridSize;
/**
* 窗口内最大请求数
*/
private final int maxCount;
/**
* 存储每个小格子的请求数
*/
private final LinkedList<AtomicInteger> gridList = new LinkedList<>();
/**
* 窗口内总请求数
*/
private final AtomicInteger totalCount = new AtomicInteger(0);
/**
* 最后一个格子的开始时间
*/
private volatile long lastGridStartTime;
public SlidingWindowRateLimiter(long windowSize, int gridNum, int maxCount) {
if (windowSize <= 0 || gridNum <= 0 || maxCount <= 0 || windowSize % gridNum != 0) {
throw new IllegalArgumentException("窗口参数不合法");
}
this.windowSize = windowSize;
this.gridSize = windowSize / gridNum;
this.maxCount = maxCount;
this.lastGridStartTime = System.currentTimeMillis();
// 初始化第一个格子
gridList.add(new AtomicInteger(0));
}
/**
* 尝试获取令牌,判断请求是否允许通过
*
* @return true-允许通过,false-触发限流
*/
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
// 计算当前时间与最后一个格子的时间差,生成新的格子
long timeOffset = currentTime - lastGridStartTime;
long gridOffset = timeOffset / gridSize;
// 生成新的格子
for (int i = 0; i < gridOffset; i++) {
gridList.addLast(new AtomicInteger(0));
lastGridStartTime += gridSize;
}
// 移除过期的格子
while (gridList.size() > windowSize / gridSize) {
AtomicInteger expiredGrid = gridList.removeFirst();
totalCount.addAndGet(-expiredGrid.get());
}
// 总请求数超过阈值,触发限流
if (totalCount.get() >= maxCount) {
log.warn("滑动窗口限流触发,窗口内请求数已达上限:{}", maxCount);
return false;
}
// 给当前最后一个格子计数+1
gridList.getLast().incrementAndGet();
totalCount.incrementAndGet();
return true;
}
}
2.2.3 漏桶算法
核心原理:请求像水一样进入漏桶,漏桶以固定的速率出水(处理请求),当桶满了之后,多余的水(请求)会被直接拒绝。核心是强制控制请求的处理速率,实现流量的绝对平滑整形。核心特点:流量绝对平滑,无任何突刺;无法应对突发流量,即使系统有空闲资源,也只能按固定速率处理请求。适用场景:需要严格控制输出速率的场景,如消息消费、第三方接口调用。
package com.jam.demo.limit;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicLong;
/**
* 漏桶算法限流实现
*
* @author ken
*/
@Slf4j
public class LeakyBucketRateLimiter {
/**
* 桶的最大容量
*/
private final long capacity;
/**
* 漏水速率,单位:请求数/毫秒
*/
private final double leakRate;
/**
* 当前桶内的水量
*/
private final AtomicLong currentWater = new AtomicLong(0);
/**
* 上次漏水的时间
*/
private volatile long lastLeakTime;
public LeakyBucketRateLimiter(long capacity, long leakCountPerSecond) {
if (capacity <= 0 || leakCountPerSecond <= 0) {
throw new IllegalArgumentException("桶容量和漏水速率必须大于0");
}
this.capacity = capacity;
this.leakRate = (double) leakCountPerSecond / 1000;
this.lastLeakTime = System.currentTimeMillis();
}
/**
* 尝试获取令牌,判断请求是否允许通过
*
* @return true-允许通过,false-触发限流
*/
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
// 计算时间差,执行漏水
long timeOffset = currentTime - lastLeakTime;
long leakWater = (long) (timeOffset * leakRate);
if (leakWater > 0) {
currentWater.set(Math.max(0, currentWater.get() - leakWater));
lastLeakTime = currentTime;
}
// 桶未满,允许请求进入
if (currentWater.get() < capacity) {
currentWater.incrementAndGet();
return true;
}
// 桶满,触发限流
log.warn("漏桶限流触发,桶容量已达上限:{}", capacity);
return false;
}
}
2.2.4 令牌桶算法
核心原理:系统以固定的速率向令牌桶中添加令牌,桶的容量有上限,当请求到来时,需要从桶中获取一个令牌,获取成功则处理请求,获取失败则拒绝请求。与漏桶算法的核心区别是:令牌桶允许突发流量,只要桶中有令牌,就可以一次性处理多个请求,同时也能限制平均请求速率。核心特点:既可以限制平均请求速率,又能应对合法的突发流量,灵活性极高,是工业界最常用的限流算法。适用场景:互联网业务的绝大多数接口限流、网关限流场景。
package com.jam.demo.limit;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicLong;
/**
* 令牌桶算法限流实现
*
* @author ken
*/
@Slf4j
public class TokenBucketRateLimiter {
/**
* 桶的最大容量
*/
private final long capacity;
/**
* 令牌生成速率,单位:令牌数/毫秒
*/
private final double tokenRate;
/**
* 当前桶内的令牌数
*/
private final AtomicLong currentToken = new AtomicLong(0);
/**
* 上次生成令牌的时间
*/
private volatile long lastTokenTime;
public TokenBucketRateLimiter(long capacity, long tokenCountPerSecond) {
if (capacity <= 0 || tokenCountPerSecond <= 0) {
throw new IllegalArgumentException("桶容量和令牌生成速率必须大于0");
}
this.capacity = capacity;
this.tokenRate = (double) tokenCountPerSecond / 1000;
this.lastTokenTime = System.currentTimeMillis();
// 初始化桶,填满令牌
currentToken.set(capacity);
}
/**
* 尝试获取令牌,判断请求是否允许通过
*
* @return true-允许通过,false-触发限流
*/
public synchronized boolean tryAcquire() {
return tryAcquire(1);
}
/**
* 尝试获取指定数量的令牌,判断请求是否允许通过
*
* @param needToken 需要的令牌数量
* @return true-允许通过,false-触发限流
*/
public synchronized boolean tryAcquire(int needToken) {
if (needToken <= 0 || needToken > capacity) {
throw new IllegalArgumentException("需要的令牌数不合法");
}
long currentTime = System.currentTimeMillis();
// 计算时间差,生成新的令牌
long timeOffset = currentTime - lastTokenTime;
long newToken = (long) (timeOffset * tokenRate);
if (newToken > 0) {
currentToken.set(Math.min(capacity, currentToken.get() + newToken));
lastTokenTime = currentTime;
}
// 令牌充足,获取成功
if (currentToken.get() >= needToken) {
currentToken.addAndGet(-needToken);
return true;
}
// 令牌不足,触发限流
log.warn("令牌桶限流触发,需要令牌数:{},当前可用令牌数:{}", needToken, currentToken.get());
return false;
}
}
2.3 分布式限流实战实现
单机限流只能控制单节点的流量,在集群环境下,亿级流量需要分布式限流来控制整个集群的总流量。分布式限流的核心是保证限流操作的原子性,我们采用Redis+Lua的方案实现:Redis的单线程模型保证了命令的串行执行,Lua脚本可以将多个Redis操作打包成一个原子操作,彻底避免并发场景下的限流失效问题。
2.3.1 限流Lua脚本
-- 限流key
local key = KEYS[1]
-- 窗口大小,单位毫秒
local windowSize = tonumber(ARGV[1])
-- 窗口内最大请求数
local maxCount = tonumber(ARGV[2])
-- 当前时间戳
local currentTime = tonumber(ARGV[3])
-- 移除窗口外的过期请求
redis.call('ZREMRANGEBYSCORE', key, 0, currentTime - windowSize)
-- 获取当前窗口内的请求数
local currentCount = redis.call('ZCARD', key)
-- 请求数超过阈值,返回0(限流)
if currentCount >= maxCount then
return 0
end
-- 请求数未达阈值,添加当前请求到有序集合
redis.call('ZADD', key, currentTime, currentTime .. '-' .. math.random())
-- 设置key的过期时间,避免冷key占用内存
redis.call('PEXPIRE', key, windowSize)
-- 返回1(通过)
return 1
2.3.2 Java分布式限流实现
package com.jam.demo.limit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.Collections;
import java.util.List;
/**
* 分布式限流实现(Redis+Lua)
*
* @author ken
*/
@Slf4j
@Component
public class DistributedRateLimiter {
private final StringRedisTemplate stringRedisTemplate;
private final DefaultRedisScript<Long> limitScript;
public DistributedRateLimiter(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
this.limitScript = new DefaultRedisScript<>();
this.limitScript.setResultType(Long.class);
this.limitScript.setScriptText("local key = KEYS[1]\nlocal windowSize = tonumber(ARGV[1])\nlocal maxCount = tonumber(ARGV[2])\nlocal currentTime = tonumber(ARGV[3])\nredis.call('ZREMRANGEBYSCORE', key, 0, currentTime - windowSize)\nlocal currentCount = redis.call('ZCARD', key)\nif currentCount >= maxCount then\n return 0\nend\nredis.call('ZADD', key, currentTime, currentTime .. '-' .. math.random())\nredis.call('PEXPIRE', key, windowSize)\nreturn 1");
}
/**
* 尝试获取分布式限流令牌
*
* @param limitKey 限流唯一标识
* @param windowSize 时间窗口大小,单位毫秒
* @param maxCount 窗口内最大请求数
* @return true-允许通过,false-触发限流
*/
public boolean tryAcquire(String limitKey, long windowSize, int maxCount) {
if (!org.springframework.util.StringUtils.hasText(limitKey) || windowSize <= 0 || maxCount <= 0) {
throw new IllegalArgumentException("限流参数不合法");
}
List<String> keys = Collections.singletonList(limitKey);
long currentTime = System.currentTimeMillis();
Long result = stringRedisTemplate.execute(limitScript, keys,
String.valueOf(windowSize),
String.valueOf(maxCount),
String.valueOf(currentTime));
if (ObjectUtils.isEmpty(result) || result == 0) {
log.warn("分布式限流触发,key:{},窗口内请求数已达上限:{}", limitKey, maxCount);
return false;
}
return true;
}
}
三、熔断:下游故障的隔离止损
3.1 熔断的底层逻辑
在分布式系统中,一个业务请求往往会依赖多个下游服务,当其中一个下游服务出现故障(响应超时、错误率飙升)时,上游服务的调用线程会被阻塞,大量的阻塞线程会耗尽上游服务的线程池资源,最终导致上游服务也不可用,故障会一层一层向上蔓延,引发全链路雪崩。
熔断的本质,就是故障隔离,基于熔断器模式,当下游服务的故障指标达到阈值时,主动切断对该服务的调用,直接执行降级逻辑,避免故障扩散,同时给下游服务足够的恢复时间。
3.2 熔断器核心状态机
熔断器的核心是状态机设计,包含三个核心状态,状态转换逻辑如下:
- 关闭状态(Closed):熔断器正常工作,请求正常调用下游服务,同时统计请求的错误率、慢调用比例等指标,当指标达到熔断阈值时,状态切换为打开状态。
- 打开状态(Open):熔断器切断对下游服务的调用,所有请求直接执行降级逻辑,不调用下游服务。经过设定的熔断等待时间后,状态切换为半开状态,探测下游服务是否恢复。
- 半开状态(Half-Open):熔断器允许少量的请求调用下游服务,同时统计这些请求的指标,如果请求正常,说明下游服务已经恢复,状态切换为关闭状态;如果请求仍然失败,说明下游服务还未恢复,状态切换回打开状态。
3.3 熔断实战实现
我们采用Resilience4j实现熔断能力,Resilience4j是轻量级、函数式的熔断组件,完全兼容JDK17,是Hystrix停更后的官方推荐替代方案。
3.3.1 熔断器配置类
package com.jam.demo.circuitbreak;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
/**
* 熔断器配置类
*
* @author ken
*/
@Slf4j
@Configuration
public class CircuitBreakConfig {
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
// 全局默认熔断器配置
CircuitBreakerConfig defaultConfig = CircuitBreakerConfig.custom()
// 滑动窗口大小,单位:请求数
.slidingWindowSize(100)
// 滑动窗口类型:基于请求数计数
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
// 熔断触发的最小请求数,低于该数量不会触发熔断
.minimumNumberOfCalls(10)
// 错误率阈值,超过该比例触发熔断
.failureRateThreshold(50)
// 慢调用比例阈值,超过该比例触发熔断
.slowCallRateThreshold(80)
// 慢调用定义,响应时间超过该值视为慢调用
.slowCallDurationThreshold(Duration.ofMillis(500))
// 熔断打开状态的等待时间,之后进入半开状态
.waitDurationInOpenState(Duration.ofSeconds(5))
// 半开状态允许通过的探测请求数
.permittedNumberOfCallsInHalfOpenState(5)
// 记录为失败的异常类型
.recordExceptions(RuntimeException.class, Exception.class)
.build();
return CircuitBreakerRegistry.of(defaultConfig);
}
/**
* 订单服务熔断器
*/
@Bean
public CircuitBreaker orderServiceCircuitBreaker(CircuitBreakerRegistry registry) {
CircuitBreaker circuitBreaker = registry.circuitBreaker("orderService");
// 注册熔断器状态监听器
circuitBreaker.getEventPublisher()
.onStateTransition(event -> log.info("订单服务熔断器状态变更:{} -> {}",
event.getFromState(), event.getToState()))
.onError(event -> log.warn("订单服务熔断器捕获异常:", event.getThrowable()))
.onSuccess(event -> log.debug("订单服务熔断器调用成功"));
return circuitBreaker;
}
}
3.3.2 熔断业务实现
package com.jam.demo.service.impl;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.OrderMapper;
import com.jam.demo.service.OrderService;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.decorators.Decorators;
import io.vavr.control.Try;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.ObjectUtils;
import java.util.function.Supplier;
/**
* 订单服务实现类
*
* @author ken
*/
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
private final OrderMapper orderMapper;
private final CircuitBreaker orderServiceCircuitBreaker;
private final TransactionTemplate transactionTemplate;
public OrderServiceImpl(OrderMapper orderMapper, CircuitBreaker orderServiceCircuitBreaker, TransactionTemplate transactionTemplate) {
this.orderMapper = orderMapper;
this.orderServiceCircuitBreaker = orderServiceCircuitBreaker;
this.transactionTemplate = transactionTemplate;
}
@Override
public Order getOrderById(Long orderId) {
// 包装业务逻辑为Supplier
Supplier<Order> supplier = () -> {
if (ObjectUtils.isEmpty(orderId) || orderId <= 0) {
throw new IllegalArgumentException("订单ID不合法");
}
return orderMapper.selectById(orderId);
};
// 装饰Supplier,添加熔断能力
Supplier<Order> decoratedSupplier = Decorators.ofSupplier(supplier)
.withCircuitBreaker(orderServiceCircuitBreaker)
.decorate();
// 执行调用,异常时执行降级逻辑
return Try.ofSupplier(decoratedSupplier)
.recover(this::getOrderByIdFallback)
.get();
}
/**
* 订单查询熔断降级逻辑
*
* @param throwable 异常信息
* @return 兜底订单数据
*/
private Order getOrderByIdFallback(Throwable throwable) {
log.error("订单查询触发熔断降级,异常信息:", throwable);
Order fallbackOrder = new Order();
fallbackOrder.setOrderId(0L);
fallbackOrder.setOrderStatus(0);
fallbackOrder.setGoodsName("系统繁忙,请稍后重试");
return fallbackOrder;
}
@Override
public Boolean createOrder(Order order) {
return transactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
try {
int insert = orderMapper.insert(order);
return insert > 0;
} catch (Exception e) {
status.setRollbackOnly();
log.error("订单创建失败,事务回滚", e);
throw e;
}
}
});
}
}
四、降级:系统负载的兜底保障
4.1 降级的底层逻辑
当系统的整体负载(CPU使用率、内存使用率、接口响应时间、数据库连接数等)达到预警阈值时,系统的处理能力会急剧下降,此时如果继续处理所有业务功能,会导致核心业务的响应变慢,甚至不可用。
降级的本质,就是资源倾斜,通过主动关闭非核心业务功能,释放系统资源,保障核心业务的正常运行。
4.2 主流降级方案与实战实现
4.2.1 功能降级:动态开关实现
功能降级是最常用的降级方式,通过动态配置中心实现降级开关的实时更新,无需重启服务即可快速关闭非核心功能。我们采用Nacos作为动态配置中心实现。
package com.jam.demo.degrade;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 降级开关配置
*
* @author ken
*/
@Slf4j
@Component
public class DegradeSwitchConfig {
/**
* 评论功能降级开关,true-开启降级,false-正常运行
*/
@NacosValue(value = "${degrade.switch.comment:false}", autoRefreshed = true)
private boolean commentDegradeSwitch;
/**
* 推荐功能降级开关,true-开启降级,false-正常运行
*/
@NacosValue(value = "${degrade.switch.recommend:false}", autoRefreshed = true)
private boolean recommendDegradeSwitch;
/**
* 积分功能降级开关,true-开启降级,false-正常运行
*/
@NacosValue(value = "${degrade.switch.point:false}", autoRefreshed = true)
private boolean pointDegradeSwitch;
public boolean isCommentDegrade() {
return commentDegradeSwitch;
}
public boolean isRecommendDegrade() {
return recommendDegradeSwitch;
}
public boolean isPointDegrade() {
return pointDegradeSwitch;
}
}
package com.jam.demo.service.impl;
import com.jam.demo.degrade.DegradeSwitchConfig;
import com.jam.demo.service.CommentService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
/**
* 评论服务实现类
*
* @author ken
*/
@Slf4j
@Service
public class CommentServiceImpl implements CommentService {
private final DegradeSwitchConfig degradeSwitchConfig;
public CommentServiceImpl(DegradeSwitchConfig degradeSwitchConfig) {
this.degradeSwitchConfig = degradeSwitchConfig;
}
@Override
public List<String> getCommentList(Long goodsId) {
// 触发降级,直接返回兜底数据
if (degradeSwitchConfig.isCommentDegrade()) {
log.warn("评论功能已触发降级,返回兜底数据");
return Collections.emptyList();
}
// 正常业务逻辑
return List.of("商品质量很好", "物流速度很快", "推荐购买");
}
}
4.2.2 读降级:多级兜底实现
读降级的核心是构建多级读链路,当底层存储压力过大时,逐级降级,最终返回兜底静态数据,保障读接口的可用性。读链路优先级:主库读 -> 从库读 -> Redis缓存读 -> 本地缓存读 -> 兜底静态数据。
package com.jam.demo.service.impl;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.jam.demo.entity.Goods;
import com.jam.demo.mapper.GoodsMapper;
import com.jam.demo.service.GoodsService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import com.alibaba.fastjson2.JSON;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
* 商品服务实现类
*
* @author ken
*/
@Slf4j
@Service
public class GoodsServiceImpl implements GoodsService {
private final GoodsMapper goodsMapper;
private final StringRedisTemplate stringRedisTemplate;
/**
* 本地缓存,Caffeine实现
*/
private final Cache<Long, Goods> localCache;
/**
* Redis缓存key前缀
*/
private static final String GOODS_CACHE_PREFIX = "goods:info:";
/**
* 缓存过期时间
*/
private static final long CACHE_EXPIRE_SECONDS = 3600;
public GoodsServiceImpl(GoodsMapper goodsMapper, StringRedisTemplate stringRedisTemplate) {
this.goodsMapper = goodsMapper;
this.stringRedisTemplate = stringRedisTemplate;
this.localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(Duration.ofMinutes(5))
.build();
}
@Override
public Goods getGoodsById(Long goodsId) {
if (ObjectUtils.isEmpty(goodsId) || goodsId <= 0) {
throw new IllegalArgumentException("商品ID不合法");
}
// 1. 先查本地缓存
Goods goods = localCache.getIfPresent(goodsId);
if (!ObjectUtils.isEmpty(goods)) {
log.debug("本地缓存命中,goodsId:{}", goodsId);
return goods;
}
// 2. 本地缓存未命中,查Redis缓存
String cacheKey = GOODS_CACHE_PREFIX + goodsId;
String cacheValue = stringRedisTemplate.opsForValue().get(cacheKey);
if (org.springframework.util.StringUtils.hasText(cacheValue)) {
goods = JSON.parseObject(cacheValue, Goods.class);
localCache.put(goodsId, goods);
log.debug("Redis缓存命中,goodsId:{}", goodsId);
return goods;
}
// 3. Redis缓存未命中,查数据库
try {
goods = goodsMapper.selectById(goodsId);
} catch (Exception e) {
log.error("数据库查询异常,触发读降级,goodsId:{}", goodsId, e);
return getGoodsByIdFallback(goodsId);
}
// 4. 数据库查询结果回写缓存
if (!ObjectUtils.isEmpty(goods)) {
stringRedisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(goods), CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS);
localCache.put(goodsId, goods);
}
return goods;
}
/**
* 商品查询读降级兜底逻辑
*
* @param goodsId 商品ID
* @return 兜底商品数据
*/
private Goods getGoodsByIdFallback(Long goodsId) {
Goods fallbackGoods = new Goods();
fallbackGoods.setGoodsId(goodsId);
fallbackGoods.setGoodsName("商品信息加载中");
fallbackGoods.setGoodsPrice(0L);
fallbackGoods.setStockNum(0);
return fallbackGoods;
}
}
4.2.3 写降级:同步改异步实现
写降级的核心是将同步写数据库的操作,降级为异步写,先写缓存/MQ,再通过异步线程落库,削峰填谷的同时,提升用户的响应体验。我们采用RocketMQ实现异步写降级。
package com.jam.demo.service.impl;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.OrderMapper;
import com.jam.demo.service.OrderAsyncService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson2.JSON;
/**
* 订单异步服务实现类
*
* @author ken
*/
@Slf4j
@Service
public class OrderAsyncServiceImpl implements OrderAsyncService {
private final RocketMQTemplate rocketMQTemplate;
private final OrderMapper orderMapper;
private static final String ORDER_CREATE_TOPIC = "order_create_topic";
public OrderAsyncServiceImpl(RocketMQTemplate rocketMQTemplate, OrderMapper orderMapper) {
this.rocketMQTemplate = rocketMQTemplate;
this.orderMapper = orderMapper;
}
@Override
public Boolean asyncCreateOrder(Order order) {
try {
// 发送消息到RocketMQ,异步落库
rocketMQTemplate.convertAndSend(ORDER_CREATE_TOPIC, JSON.toJSONString(order));
log.info("订单异步创建消息发送成功,orderId:{}", order.getOrderId());
return true;
} catch (Exception e) {
log.error("订单异步创建消息发送失败,orderId:{}", order.getOrderId(), e);
return false;
}
}
@Override
public void saveOrderFromMq(Order order) {
orderMapper.insert(order);
log.info("订单异步落库成功,orderId:{}", order.getOrderId());
}
}
package com.jam.demo.mq;
import com.jam.demo.entity.Order;
import com.jam.demo.service.OrderAsyncService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson2.JSON;
import org.springframework.util.ObjectUtils;
/**
* 订单创建消息消费者
*
* @author ken
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "order_create_topic", consumerGroup = "order_create_consumer_group")
public class OrderCreateConsumer implements RocketMQListener<String> {
private final OrderAsyncService orderAsyncService;
public OrderCreateConsumer(OrderAsyncService orderAsyncService) {
this.orderAsyncService = orderAsyncService;
}
@Override
public void onMessage(String message) {
if (!org.springframework.util.StringUtils.hasText(message)) {
log.warn("订单创建消息为空,跳过处理");
return;
}
Order order = JSON.parseObject(message, Order.class);
if (ObjectUtils.isEmpty(order) || ObjectUtils.isEmpty(order.getOrderId())) {
log.warn("订单创建消息解析失败,message:{}", message);
return;
}
orderAsyncService.saveOrderFromMq(order);
}
}
五、全链路流量治理架构设计与流程落地
5.1 全链路流量治理架构
我们构建从用户流量入口到数据存储的全链路分层防护架构,层层拦截、层层兜底,确保亿级流量下系统的稳定可用。
各层级防护能力说明:
- CDN/静态资源层:静态资源全量缓存,拦截90%以上的静态资源请求,是流量治理的第一道防线。
- 四层负载层:TCP层面的流量管控,实现黑名单IP拦截、单IP连接数限制、恶意流量清洗,拦截非法攻击流量。
- 网关层:集群入口的核心防护层,实现全集群分布式限流、统一的熔断降级规则、请求路由与流量管控,是流量治理的核心防线。
- 业务服务层:精细化流量管控,实现接口级别的单机限流、服务间调用的熔断、业务功能的动态降级,是流量治理的业务落地层。
- 中间件与存储层:缓存、消息队列、数据库的底层资源防护,实现连接数限制、慢调用熔断、消费限流,是流量治理的兜底防线。
- 动态配置中心:所有限流、熔断、降级规则的动态更新,无需重启服务即可快速调整防护策略。
- 全链路监控中心:实时采集全链路流量指标、规则触发情况、系统负载数据,实现故障告警与策略优化,是流量治理的眼睛。
5.2 全链路流量治理处理流程
全链路流程的核心设计思路:
- 流量越早拦截,对系统的影响越小,优先在网关层拦截超出阈值的流量,避免无效请求进入业务集群。
- 每一层都有独立的防护能力,即使上层防护失效,下层也能提供兜底防护,避免单点故障引发全链路雪崩。
- 所有异常场景都有兜底响应,不会直接返回错误,保障用户体验的同时,避免异常扩散。
六、项目依赖与基础配置
6.1 Maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
<relativePath/>
</parent>
<groupId>com.jam.demo</groupId>
<artifactId>flow-governance-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>flow-governance-demo</name>
<description>亿级流量治理实战项目</description>
<properties>
<java.version>17</java.version>
<resilience4j.version>2.2.0</resilience4j.version>
<mybatis-plus.version>3.5.6</mybatis-plus.version>
<nacos.version>2.3.2</nacos.version>
<rocketmq.version>2.3.0</rocketmq.version>
<fastjson2.version>2.0.52</fastjson2.version>
<caffeine.version>3.1.8</caffeine.version>
<guava.version>32.1.3-jre</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-config-spring-boot3</artifactId>
<version>${nacos.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
6.2 MySQL 8.0 表结构SQL
CREATE TABLE `t_order` (
`order_id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`user_id` bigint NOT NULL COMMENT '用户ID',
`goods_id` bigint NOT NULL COMMENT '商品ID',
`goods_name` varchar(255) NOT NULL COMMENT '商品名称',
`goods_price` bigint NOT NULL COMMENT '商品价格,单位分',
`order_status` tinyint NOT NULL DEFAULT '0' COMMENT '订单状态:0-待支付,1-已支付,2-已取消',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`order_id`),
KEY `idx_user_id` (`user_id`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='订单表';
CREATE TABLE `t_goods` (
`goods_id` bigint NOT NULL AUTO_INCREMENT COMMENT '商品ID',
`goods_name` varchar(255) NOT NULL COMMENT '商品名称',
`goods_price` bigint NOT NULL COMMENT '商品价格,单位分',
`stock_num` int NOT NULL DEFAULT '0' COMMENT '库存数量',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`goods_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='商品表';
6.3 实体类与Mapper
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 订单实体类
*
* @author ken
*/
@Data
@TableName("t_order")
@Schema(description = "订单实体")
public class Order {
@TableId(type = IdType.AUTO)
@Schema(description = "订单ID")
private Long orderId;
@Schema(description = "用户ID")
private Long userId;
@Schema(description = "商品ID")
private Long goodsId;
@Schema(description = "商品名称")
private String goodsName;
@Schema(description = "商品价格,单位分")
private Long goodsPrice;
@Schema(description = "订单状态:0-待支付,1-已支付,2-已取消")
private Integer orderStatus;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 商品实体类
*
* @author ken
*/
@Data
@TableName("t_goods")
@Schema(description = "商品实体")
public class Goods {
@TableId(type = IdType.AUTO)
@Schema(description = "商品ID")
private Long goodsId;
@Schema(description = "商品名称")
private String goodsName;
@Schema(description = "商品价格,单位分")
private Long goodsPrice;
@Schema(description = "库存数量")
private Integer stockNum;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Mapper;
/**
* 订单Mapper
*
* @author ken
*/
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Goods;
import org.apache.ibatis.annotations.Mapper;
/**
* 商品Mapper
*
* @author ken
*/
@Mapper
public interface GoodsMapper extends BaseMapper<Goods> {
}
6.4 业务接口Controller
package com.jam.demo.controller;
import com.jam.demo.entity.Order;
import com.jam.demo.limit.DistributedRateLimiter;
import com.jam.demo.limit.TokenBucketRateLimiter;
import com.jam.demo.service.CommentService;
import com.jam.demo.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* 订单接口控制器
*
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/order")
@Tag(name = "订单接口", description = "订单相关业务接口,包含限流、熔断、降级能力")
public class OrderController {
private final OrderService orderService;
private final CommentService commentService;
private final DistributedRateLimiter distributedRateLimiter;
private final TokenBucketRateLimiter tokenBucketRateLimiter;
/**
* 分布式限流key前缀
*/
private static final String ORDER_LIMIT_KEY = "order:limit:";
/**
* 限流窗口大小,1秒
*/
private static final long LIMIT_WINDOW_SIZE = 1000;
/**
* 窗口内最大请求数
*/
private static final int LIMIT_MAX_COUNT = 100;
public OrderController(OrderService orderService, CommentService commentService, DistributedRateLimiter distributedRateLimiter) {
this.orderService = orderService;
this.commentService = commentService;
this.distributedRateLimiter = distributedRateLimiter;
this.tokenBucketRateLimiter = new TokenBucketRateLimiter(1000, 100);
}
@GetMapping("/{orderId}")
@Operation(summary = "查询订单详情", description = "包含熔断降级能力")
public ResponseEntity<Order> getOrderById(
@Parameter(description = "订单ID", required = true) @PathVariable Long orderId) {
Order order = orderService.getOrderById(orderId);
return ResponseEntity.ok(order);
}
@PostMapping("/create")
@Operation(summary = "创建订单", description = "包含分布式限流能力")
public ResponseEntity<Boolean> createOrder(
@Parameter(description = "订单信息", required = true) @RequestBody Order order) {
// 分布式限流校验
String limitKey = ORDER_LIMIT_KEY + order.getUserId();
if (!distributedRateLimiter.tryAcquire(limitKey, LIMIT_WINDOW_SIZE, LIMIT_MAX_COUNT)) {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(false);
}
// 单机令牌桶限流校验
if (!tokenBucketRateLimiter.tryAcquire()) {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(false);
}
Boolean result = orderService.createOrder(order);
return ResponseEntity.ok(result);
}
@GetMapping("/comment/{goodsId}")
@Operation(summary = "查询商品评论列表", description = "包含功能降级能力")
public ResponseEntity<List<String>> getCommentList(
@Parameter(description = "商品ID", required = true) @PathVariable Long goodsId) {
List<String> commentList = commentService.getCommentList(goodsId);
return ResponseEntity.ok(commentList);
}
}
七、生产环境避坑指南与最佳实践
7.1 核心避坑指南
- 限流阈值设置不合理:阈值设置过高起不到防护作用,设置过低会误杀正常请求。必须通过全链路压测获取系统的真实承载上限,阈值设置为上限的70%-80%,预留充足的缓冲空间。
- 熔断超时时间不匹配:熔断器的慢调用阈值必须小于调用方的超时时间,否则熔断器还没触发,调用方已经超时,线程已经被阻塞,完全起不到故障隔离的作用。
- 降级逻辑引入新故障:降级逻辑必须是轻量级、无外部依赖的,不能在降级逻辑中调用其他有风险的服务,否则会引发二次故障,加剧系统雪崩。
- 分布式限流原子性缺失:分布式限流必须保证操作的原子性,禁止使用Redis的get+set分开操作,必须采用Lua脚本或Redis原子命令,否则并发场景下会出现限流完全失效。
- 重试导致流量放大:下游服务出现故障时,上游的重试机制会导致流量放大数倍,加剧故障扩散。必须在熔断的同时限制重试次数,熔断打开状态下禁止重试。
- 全链路监控缺失:没有监控的流量治理就是盲人骑瞎马,必须实时监控限流、熔断、降级的触发次数、请求量、错误率等核心指标,及时发现异常并调整策略。
7.2 落地最佳实践
- 全链路压测先行:所有流量治理规则的阈值,必须基于全链路压测的结果设置,禁止凭经验拍脑袋设置,确保规则符合系统的真实承载能力。
- 动态配置优先:所有限流、熔断、降级的规则,必须支持动态更新,无需重启服务,应对突发流量时可以快速调整策略,降低故障影响面。
- 灰度发布验证:流量治理规则上线时,必须先对小部分流量灰度生效,验证规则正确、无业务影响后,再逐步全量发布,避免规则错误导致全量业务不可用。
- 多维度分层防护:不能只依赖单一的防护手段,必须构建从入口层到存储层的全链路、多维度防护体系,层层拦截、层层兜底,避免单点防护失效引发全链路故障。
- 常态化故障演练:通过混沌工程常态化注入故障,验证流量治理规则的有效性,确保故障发生时,规则能正常触发,系统能稳定运行。
- 兜底逻辑优先:所有的限流、熔断、降级场景,都必须设计轻量级的兜底响应逻辑,禁止直接返回500错误,在保障系统稳定的同时,尽可能提升用户体验。
流量治理不是一劳永逸的配置,而是一个持续优化的过程。需要结合业务发展、系统迭代、流量变化,持续调整优化防护策略,才能真正扛住亿级流量的冲击,保障系统的长期稳定可用。