《服务治理》限流:微服务架构的"流量阀门"

简介: 限流是保护系统稳定的核心技术,通过控制请求速率防止过载。本文详解了固定窗口、滑动窗口、漏桶、令牌桶等算法原理与场景,并结合Sentinel实现应用级限流及Redis分布式限流,涵盖自定义限流器、动态阈值调整与监控告警体系,构建多层级防护,确保高并发下的系统可靠性与用户体验。

1. 限流核心概念

1.1 什么是限流?

限流(Rate Limiting)是通过控制单位时间内系统能够处理的请求数量,来保护系统免受过载流量冲击的技术手段。


// 限流的现实比喻
public class RateLimitingAnalogy {
    
    /**
     * 交通系统 vs 限流系统
     */
    public class TrafficSystemComparison {
        // 交通信号灯 → 限流控制器
        // 高速公路收费站 → 令牌桶算法
        // 车流量统计 → 请求计数器
        // 交通管制 → 限流策略
    }
    
    /**
     * 没有限流的风险
     */
    public class WithoutRateLimiting {
        // 1. 资源耗尽:突发流量耗尽CPU、内存、数据库连接
        // 2. 服务雪崩:一个服务崩溃引发连锁反应
        // 3. 安全风险:DDoS攻击导致服务不可用
        // 4. 用户体验:系统响应缓慢或完全不可用
    }
}

1.2 限流的核心目标


2. 限流算法深度解析

2.1 主流限流算法对比

算法

原理

优点

缺点

适用场景

固定窗口

固定时间窗口计数

实现简单

临界问题、不够平滑

简单限流

滑动窗口

多个子窗口统计

相对精确

实现复杂

精确控制

漏桶算法

恒定速率处理

流量整形

无法应对突发

平滑流量

令牌桶算法

按速率生成令牌

允许突发

实现复杂

大部分场景

自适应限流

动态调整阈值

智能调节

算法复杂

动态环境

2.2 限流算法架构

3. Spring Cloud Alibaba Sentinel 限流实战

3.1 环境准备与依赖配置

<!-- pom.xml Sentinel限流依赖 -->
<dependencies>
    <!-- Sentinel Core -->
    <dependency>
        <groupId>com.alibaba.csp</groupId>
        <artifactId>sentinel-core</artifactId>
        <version>1.8.6</version>
    </dependency>
    
    <!-- Sentinel Spring Cloud Integration -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
        <version>2022.0.0.0</version>
    </dependency>
    
    <!-- Sentinel Datasource Nacos -->
    <dependency>
        <groupId>com.alibaba.csp</groupId>
        <artifactId>sentinel-datasource-nacos</artifactId>
        <version>1.8.6</version>
    </dependency>
    
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    
    <!-- AOP -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
</dependencies>

3.2 Sentinel基础限流配置

# application.yml Sentinel配置
spring:
  application:
    name: order-service
  cloud:
    sentinel:
      enabled: true
      eager: true
      transport:
        dashboard: 192.168.1.100:8080  # Sentinel控制台
        port: 8719
      # 限流规则数据源
      datasource:
        flow-rule:
          nacos:
            server-addr: 192.168.1.100:8848
            dataId: ${spring.application.name}-flow-rules
            groupId: SENTINEL_GROUP
            rule-type: flow
            namespace: sentinel-config
# 日志配置
logging:
  level:
    com.alibaba.csp.sentinel: INFO
    org.springframework.cloud.alibaba.sentinel: DEBUG
# 监控端点
management:
  endpoints:
    web:
      exposure:
        include: sentinel,health,metrics
  endpoint:
    sentinel:
      enabled: true

3.3 基础限流实战


// 订单服务限流控制器
@RestController
@RequestMapping("/api/orders")
@Slf4j
public class OrderRateLimitController {
    
    @Autowired
    private OrderService orderService;
    
    /**
     * 创建订单 - QPS限流
     */
    @PostMapping
    @SentinelResource(
        value = "createOrder",
        blockHandler = "createOrderBlockHandler",
        fallback = "createOrderFallback",
        blockHandlerClass = OrderRateLimitHandlers.class
    )
    public ResponseEntity<OrderDTO> createOrder(@RequestBody @Valid OrderCreateRequest request) {
        log.info("创建订单请求: 用户{}, 商品{}, 数量{}", 
            request.getUserId(), request.getProductId(), request.getQuantity());
        
        OrderDTO order = orderService.createOrder(request);
        return ResponseEntity.ok(order);
    }
    
    /**
     * 查询订单 - 线程数限流
     */
    @GetMapping("/{orderId}")
    @SentinelResource(
        value = "getOrderDetail",
        blockHandler = "getOrderBlockHandler",
        fallback = "getOrderFallback"
    )
    public ResponseEntity<OrderDTO> getOrder(@PathVariable String orderId) {
        log.info("查询订单详情: {}", orderId);
        
        OrderDTO order = orderService.getOrderDetail(orderId);
        return ResponseEntity.ok(order);
    }
    
    /**
     * 订单列表 - 关联限流
     */
    @GetMapping
    @SentinelResource(
        value = "listOrders",
        blockHandler = "listOrdersBlockHandler"
    )
    public ResponseEntity<PageResult<OrderDTO>> listOrders(
            @RequestParam(defaultValue = "1") int page,
            @RequestParam(defaultValue = "20") int size) {
        
        log.info("查询订单列表: 第{}页, 每页{}条", page, size);
        
        PageResult<OrderDTO> orders = orderService.listOrders(page, size);
        return ResponseEntity.ok(orders);
    }
    
    // 流控异常处理
    public ResponseEntity<OrderDTO> getOrderBlockHandler(String orderId, BlockException ex) {
        log.warn("查询订单触发流控, 订单ID: {}, 规则: {}", orderId, ex.getRule());
        
        RateLimitResponse response = RateLimitResponse.builder()
            .code(429)
            .message("系统繁忙,请稍后重试")
            .retryAfter(5) // 5秒后重试
            .build();
            
        return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(null);
    }
    
    public ResponseEntity<OrderDTO> getOrderFallback(String orderId, Throwable ex) {
        log.error("查询订单服务降级, 订单ID: {}", orderId, ex);
        return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(null);
    }
    
    public ResponseEntity<PageResult<OrderDTO>> listOrdersBlockHandler(
            Integer page, Integer size, BlockException ex) {
        
        log.warn("查询订单列表触发流控, 页码: {}, 规则: {}", page, ex.getRule());
        
        // 返回空结果而不是错误,提升用户体验
        PageResult<OrderDTO> emptyResult = PageResult.empty();
        return ResponseEntity.ok(emptyResult);
    }
}
// 统一的限流异常处理类
@Slf4j
public class OrderRateLimitHandlers {
    
    /**
     * 创建订单流控处理
     */
    public static ResponseEntity<OrderDTO> createOrderBlockHandler(
            OrderCreateRequest request, BlockException ex) {
        
        log.warn("创建订单触发流控, 用户: {}, 规则: {}", 
            request.getUserId(), ex.getRule());
            
        RateLimitResponse response = RateLimitResponse.builder()
            .code(429)
            .message("当前下单人数过多,请稍后重试")
            .retryAfter(10)
            .suggestTime(LocalDateTime.now().plusSeconds(30))
            .build();
            
        return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(null);
    }
    
    /**
     * 创建订单降级处理
     */
    public static ResponseEntity<OrderDTO> createOrderFallback(
            OrderCreateRequest request, Throwable ex) {
        
        log.error("创建订单服务降级, 请求: {}", request, ex);
        
        // 根据异常类型提供不同的降级策略
        if (ex instanceof TimeoutException) {
            return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
                .body(null);
        } else if (ex instanceof ServiceUnavailableException) {
            return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
                .body(null);
        } else {
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(null);
        }
    }
}
// 限流响应封装
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RateLimitResponse {
    private int code;
    private String message;
    private Integer retryAfter; // 重试等待时间(秒)
    private LocalDateTime suggestTime; // 建议重试时间
    private String requestId;
    
    @Builder.Default
    private long timestamp = System.currentTimeMillis();
}

3.4 高级限流规则配置


// Sentinel限流规则配置
@Component
@Slf4j
public class SentinelRateLimitConfig {
    
    /**
     * 初始化限流规则
     */
    @PostConstruct
    public void initFlowRules() {
        List<FlowRule> rules = new ArrayList<>();
        
        // 1. 创建订单 - QPS限流,预热模式
        FlowRule createOrderRule = new FlowRule();
        createOrderRule.setResource("createOrder");
        createOrderRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        createOrderRule.setCount(100); // 阈值100 QPS
        createOrderRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
        createOrderRule.setWarmUpPeriodSec(10); // 预热10秒
        createOrderRule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
        rules.add(createOrderRule);
        
        // 2. 查询订单 - QPS限流,排队等待
        FlowRule getOrderRule = new FlowRule();
        getOrderRule.setResource("getOrderDetail");
        getOrderRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        getOrderRule.setCount(500); // 阈值500 QPS
        getOrderRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
        getOrderRule.setMaxQueueingTimeMs(1000); // 最大排队1秒
        rules.add(getOrderRule);
        
        // 3. 订单列表 - 线程数限流
        FlowRule listOrdersRule = new FlowRule();
        listOrdersRule.setResource("listOrders");
        listOrdersRule.setGrade(RuleConstant.FLOW_GRADE_THREAD);
        listOrdersRule.setCount(50); // 最大50个并发线程
        rules.add(listOrdersRule);
        
        // 4. 关联限流 - 创建订单和查询订单关联
        FlowRule relationRule = new FlowRule();
        relationRule.setResource("createOrder");
        relationRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        relationRule.setCount(50);
        relationRule.setRefResource("getOrderDetail");
        relationRule.setStrategy(RuleConstant.STRATEGY_RELATE);
        rules.add(relationRule);
        
        FlowRuleManager.loadRules(rules);
        log.info("初始化Sentinel限流规则完成,规则数量: {}", rules.size());
    }
    
    /**
     * 热点参数限流规则
     */
    @PostConstruct
    public void initParamFlowRules() {
        List<ParamFlowRule> paramRules = new ArrayList<>();
        
        // 商品详情热点参数限流
        ParamFlowRule productRule = new ParamFlowRule("getProductDetail")
            .setParamIdx(0) // 第一个参数(商品ID)
            .setCount(10)   // 单商品阈值10 QPS
            .setGrade(RuleConstant.FLOW_GRADE_QPS);
        
        // 特殊商品独立限流
        ParamFlowItem hotProduct1 = new ParamFlowItem()
            .setObject("HOT_PRODUCT_001")
            .setClassType(String.class.getName())
            .setCount(1); // 爆款商品限流1 QPS
            
        ParamFlowItem hotProduct2 = new ParamFlowItem()
            .setObject("HOT_PRODUCT_002")
            .setClassType(String.class.getName())
            .setCount(1);
        
        productRule.setParamFlowItemList(Arrays.asList(hotProduct1, hotProduct2));
        paramRules.add(productRule);
        
        ParamFlowRuleManager.loadRules(paramRules);
        log.info("初始化热点参数限流规则完成");
    }
    
    /**
     * 集群流控规则
     */
    @PostConstruct
    public void initClusterFlowRules() {
        // 集群流控配置
        ClusterFlowConfig clusterConfig = new ClusterFlowConfig();
        clusterConfig.setFlowId(1L);
        clusterConfig.setThresholdType(1);
        clusterConfig.setFallbackToLocalWhenFail(true);
        
        // 全局限流规则
        FlowRule clusterRule = new FlowRule();
        clusterRule.setResource("globalCreateOrder");
        clusterRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        clusterRule.setCount(1000); // 集群总阈值1000 QPS
        clusterRule.setClusterMode(true);
        clusterRule.setClusterConfig(clusterConfig);
        
        FlowRuleManager.loadRules(Collections.singletonList(clusterRule));
        log.info("初始化集群流控规则完成");
    }
}
// 动态规则管理器
@Service
@Slf4j
public class DynamicRateLimitManager {
    
    @Autowired
    private NacosConfigManager nacosConfigManager;
    
    /**
     * 动态更新限流规则
     */
    public void updateFlowRule(String resource, double qpsThreshold) {
        List<FlowRule> rules = FlowRuleManager.getRules();
        
        // 查找现有规则
        Optional<FlowRule> existingRule = rules.stream()
            .filter(rule -> rule.getResource().equals(resource))
            .findFirst();
        
        if (existingRule.isPresent()) {
            // 更新现有规则
            FlowRule rule = existingRule.get();
            rule.setCount(qpsThreshold);
            log.info("更新限流规则: {} -> {} QPS", resource, qpsThreshold);
        } else {
            // 创建新规则
            FlowRule newRule = new FlowRule();
            newRule.setResource(resource);
            newRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
            newRule.setCount(qpsThreshold);
            rules.add(newRule);
            log.info("创建限流规则: {} -> {} QPS", resource, qpsThreshold);
        }
        
        FlowRuleManager.loadRules(rules);
    }
    
    /**
     * 根据系统负载动态调整限流阈值
     */
    @Scheduled(fixedRate = 60000) // 每分钟调整一次
    public void adaptiveRateLimit() {
        double systemLoad = getSystemLoadAverage();
        double cpuUsage = getCpuUsage();
        
        // 根据系统负载调整限流阈值
        if (systemLoad > 4.0 || cpuUsage > 0.8) {
            // 高负载时降低限流阈值
            updateFlowRule("createOrder", 50);
            updateFlowRule("getOrderDetail", 200);
            log.warn("系统高负载,降低限流阈值");
        } else if (systemLoad < 1.0 && cpuUsage < 0.3) {
            // 低负载时提高限流阈值
            updateFlowRule("createOrder", 200);
            updateFlowRule("getOrderDetail", 800);
            log.info("系统低负载,提高限流阈值");
        }
    }
    
    private double getSystemLoadAverage() {
        // 获取系统负载
        return ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
    }
    
    private double getCpuUsage() {
        // 获取CPU使用率
        com.sun.management.OperatingSystemMXBean osBean = 
            (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
        return osBean.getSystemCpuLoad();
    }
}

4. 自定义限流器实现

4.1 令牌桶限流器


// 令牌桶限流器实现
@Component
@Slf4j
public class TokenBucketRateLimiter {
    
    private final Map<String, TokenBucket> buckets = new ConcurrentHashMap<>();
    private final ScheduledExecutorService refillScheduler = Executors.newScheduledThreadPool(1);
    
    /**
     * 令牌桶配置
     */
    @Data
    @AllArgsConstructor
    public static class TokenBucketConfig {
        private int capacity;          // 桶容量
        private int refillTokens;      // 每次补充的令牌数
        private long refillIntervalMs; // 补充间隔(毫秒)
    }
    
    /**
     * 令牌桶实例
     */
    private static class TokenBucket {
        private final String key;
        private final AtomicInteger tokens;
        private final int capacity;
        private final int refillTokens;
        private final long refillIntervalMs;
        private volatile long lastRefillTime;
        
        public TokenBucket(String key, TokenBucketConfig config) {
            this.key = key;
            this.capacity = config.getCapacity();
            this.refillTokens = config.getRefillTokens();
            this.refillIntervalMs = config.getRefillIntervalMs();
            this.tokens = new AtomicInteger(capacity);
            this.lastRefillTime = System.currentTimeMillis();
        }
        
        public boolean tryAcquire(int tokensRequired) {
            refill();
            
            while (true) {
                int currentTokens = tokens.get();
                if (currentTokens < tokensRequired) {
                    return false;
                }
                
                if (tokens.compareAndSet(currentTokens, currentTokens - tokensRequired)) {
                    return true;
                }
            }
        }
        
        private void refill() {
            long currentTime = System.currentTimeMillis();
            long timeSinceLastRefill = currentTime - lastRefillTime;
            
            if (timeSinceLastRefill > refillIntervalMs) {
                int refillCount = (int) (timeSinceLastRefill / refillIntervalMs) * refillTokens;
                
                tokens.updateAndGet(current -> Math.min(capacity, current + refillCount));
                lastRefillTime = currentTime;
                
                log.debug("补充令牌: {} -> {}", key, tokens.get());
            }
        }
        
        public int getAvailableTokens() {
            refill();
            return tokens.get();
        }
    }
    
    public TokenBucketRateLimiter() {
        // 启动定时清理任务
        refillScheduler.scheduleAtFixedRate(this::cleanupIdleBuckets, 1, 1, TimeUnit.HOURS);
    }
    
    /**
     * 尝试获取令牌
     */
    public boolean tryAcquire(String key) {
        return tryAcquire(key, 1);
    }
    
    /**
     * 尝试获取指定数量的令牌
     */
    public boolean tryAcquire(String key, int tokensRequired) {
        TokenBucket bucket = buckets.computeIfAbsent(key, k -> 
            new TokenBucket(k, new TokenBucketConfig(100, 10, 1000))); // 默认配置
        
        boolean acquired = bucket.tryAcquire(tokensRequired);
        
        if (!acquired) {
            log.debug("令牌桶限流: {}, 可用令牌: {}", key, bucket.getAvailableTokens());
        }
        
        return acquired;
    }
    
    /**
     * 创建自定义令牌桶
     */
    public void createBucket(String key, TokenBucketConfig config) {
        TokenBucket bucket = new TokenBucket(key, config);
        buckets.put(key, bucket);
        log.info("创建令牌桶: {}, 容量: {}", key, config.getCapacity());
    }
    
    /**
     * 获取可用令牌数量
     */
    public int getAvailableTokens(String key) {
        TokenBucket bucket = buckets.get(key);
        return bucket != null ? bucket.getAvailableTokens() : 0;
    }
    
    /**
     * 清理空闲的令牌桶
     */
    private void cleanupIdleBuckets() {
        long now = System.currentTimeMillis();
        Iterator<Map.Entry<String, TokenBucket>> iterator = buckets.entrySet().iterator();
        
        while (iterator.hasNext()) {
            Map.Entry<String, TokenBucket> entry = iterator.next();
            // 可以基于最后使用时间等策略清理
            // 这里简化实现
        }
        
        log.info("令牌桶清理完成,当前桶数量: {}", buckets.size());
    }
    
    @PreDestroy
    public void destroy() {
        refillScheduler.shutdown();
    }
}
// 令牌桶限流注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TokenBucketRateLimit {
    String key();               // 限流键
    int capacity() default 100; // 桶容量
    int refillTokens() default 10; // 每次补充令牌数
    long refillIntervalMs() default 1000; // 补充间隔毫秒
    int tokensRequired() default 1; // 每次请求需要的令牌数
    String message() default "系统繁忙,请稍后重试";
}
// 令牌桶限流切面
@Aspect
@Component
@Slf4j
public class TokenBucketRateLimitAspect {
    
    @Autowired
    private TokenBucketRateLimiter rateLimiter;
    
    /**
     * 令牌桶限流切面
     */
    @Around("@annotation(rateLimit)")
    public Object rateLimit(ProceedingJoinPoint joinPoint, TokenBucketRateLimit rateLimit) throws Throwable {
        String key = buildRateLimitKey(joinPoint, rateLimit.key());
        
        // 初始化令牌桶
        TokenBucketRateLimiter.TokenBucketConfig config = 
            new TokenBucketRateLimiter.TokenBucketConfig(
                rateLimit.capacity(),
                rateLimit.refillTokens(),
                rateLimit.refillIntervalMs()
            );
        rateLimiter.createBucket(key, config);
        
        // 尝试获取令牌
        if (!rateLimiter.tryAcquire(key, rateLimit.tokensRequired())) {
            log.warn("令牌桶限流触发: {}", key);
            throw new RateLimitException(rateLimit.message());
        }
        
        return joinPoint.proceed();
    }
    
    private String buildRateLimitKey(ProceedingJoinPoint joinPoint, String keyTemplate) {
        // 解析SpEL表达式构建限流键
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        Object[] args = joinPoint.getArgs();
        
        ExpressionParser parser = new SpelExpressionParser();
        Expression expression = parser.parseExpression(keyTemplate);
        
        EvaluationContext context = new StandardEvaluationContext();
        context.setVariable("args", args);
        context.setVariable("method", method);
        
        // 添加参数到上下文
        Parameter[] parameters = method.getParameters();
        for (int i = 0; i < parameters.length; i++) {
            context.setVariable(parameters[i].getName(), args[i]);
        }
        
        return expression.getValue(context, String.class);
    }
}
// 限流异常
public class RateLimitException extends RuntimeException {
    private final int code;
    private final long retryAfter;
    
    public RateLimitException(String message) {
        this(429, message, 5);
    }
    
    public RateLimitException(int code, String message, long retryAfter) {
        super(message);
        this.code = code;
        this.retryAfter = retryAfter;
    }
    
    // getters...
}

4.2 滑动窗口限流器


// 滑动窗口限流器
@Component
@Slf4j
public class SlidingWindowRateLimiter {
    
    private final Map<String, SlidingWindow> windows = new ConcurrentHashMap<>();
    private final ScheduledExecutorService windowRotator = Executors.newScheduledThreadPool(1);
    
    /**
     * 滑动窗口配置
     */
    @Data
    @AllArgsConstructor
    public static class SlidingWindowConfig {
        private int windowSizeMs;      // 窗口大小(毫秒)
        private int numberOfWindows;   // 子窗口数量
        private int threshold;         // 阈值
    }
    
    /**
     * 滑动窗口实例
     */
    private static class SlidingWindow {
        private final String key;
        private final long windowSizeMs;
        private final int numberOfWindows;
        private final int threshold;
        private final long subWindowSizeMs;
        
        private final AtomicReferenceArray<WindowBucket> buckets;
        private volatile long currentWindowStart;
        
        public SlidingWindow(String key, SlidingWindowConfig config) {
            this.key = key;
            this.windowSizeMs = config.getWindowSizeMs();
            this.numberOfWindows = config.getNumberOfWindows();
            this.threshold = config.getThreshold();
            this.subWindowSizeMs = windowSizeMs / numberOfWindows;
            this.buckets = new AtomicReferenceArray<>(numberOfWindows);
            this.currentWindowStart = System.currentTimeMillis() / subWindowSizeMs * subWindowSizeMs;
            
            // 初始化桶
            for (int i = 0; i < numberOfWindows; i++) {
                buckets.set(i, new WindowBucket());
            }
        }
        
        public synchronized boolean tryAcquire() {
            long currentTime = System.currentTimeMillis();
            long currentSubWindowStart = currentTime / subWindowSizeMs * subWindowSizeMs;
            
            // 滑动窗口
            slideWindow(currentSubWindowStart);
            
            // 计算当前窗口内的请求总数
            int totalRequests = 0;
            for (int i = 0; i < numberOfWindows; i++) {
                WindowBucket bucket = buckets.get(i);
                if (bucket != null) {
                    totalRequests += bucket.getCount();
                }
            }
            
            // 检查是否超过阈值
            if (totalRequests >= threshold) {
                log.debug("滑动窗口限流: {}, 当前请求数: {}/{}", key, totalRequests, threshold);
                return false;
            }
            
            // 记录当前请求
            int subWindowIndex = (int) ((currentSubWindowStart - currentWindowStart) / subWindowSizeMs);
            if (subWindowIndex >= 0 && subWindowIndex < numberOfWindows) {
                WindowBucket bucket = buckets.get(subWindowIndex);
                if (bucket != null) {
                    bucket.increment();
                }
            }
            
            return true;
        }
        
        private void slideWindow(long currentSubWindowStart) {
            long elapsedSubWindows = (currentSubWindowStart - currentWindowStart) / subWindowSizeMs;
            
            if (elapsedSubWindows >= numberOfWindows) {
                // 重置所有桶
                for (int i = 0; i < numberOfWindows; i++) {
                    buckets.set(i, new WindowBucket());
                }
                currentWindowStart = currentSubWindowStart;
            } else if (elapsedSubWindows > 0) {
                // 滑动窗口:清除过期的桶,创建新的桶
                for (int i = 0; i < elapsedSubWindows; i++) {
                    int expiredIndex = (int) ((currentWindowStart / subWindowSizeMs + i) % numberOfWindows);
                    buckets.set(expiredIndex, new WindowBucket());
                }
                currentWindowStart += elapsedSubWindows * subWindowSizeMs;
            }
        }
        
        public int getCurrentCount() {
            int total = 0;
            for (int i = 0; i < numberOfWindows; i++) {
                WindowBucket bucket = buckets.get(i);
                if (bucket != null) {
                    total += bucket.getCount();
                }
            }
            return total;
        }
    }
    
    /**
     * 窗口桶
     */
    private static class WindowBucket {
        private final AtomicInteger count = new AtomicInteger(0);
        
        public void increment() {
            count.incrementAndGet();
        }
        
        public int getCount() {
            return count.get();
        }
        
        public void reset() {
            count.set(0);
        }
    }
    
    public SlidingWindowRateLimiter() {
        // 启动窗口维护任务
        windowRotator.scheduleAtFixedRate(this::maintainWindows, 1, 1, TimeUnit.SECONDS);
    }
    
    /**
     * 尝试获取许可
     */
    public boolean tryAcquire(String key) {
        SlidingWindow window = windows.computeIfAbsent(key, k -> 
            new SlidingWindow(k, new SlidingWindowConfig(60000, 6, 100))); // 1分钟6个窗口,阈值100
        
        return window.tryAcquire();
    }
    
    /**
     * 创建自定义滑动窗口
     */
    public void createWindow(String key, SlidingWindowConfig config) {
        SlidingWindow window = new SlidingWindow(key, config);
        windows.put(key, window);
        log.info("创建滑动窗口: {}, 窗口大小: {}ms, 阈值: {}", 
            key, config.getWindowSizeMs(), config.getThreshold());
    }
    
    /**
     * 获取当前窗口计数
     */
    public int getCurrentCount(String key) {
        SlidingWindow window = windows.get(key);
        return window != null ? window.getCurrentCount() : 0;
    }
    
    /**
     * 维护窗口,清理长时间不使用的窗口
     */
    private void maintainWindows() {
        long now = System.currentTimeMillis();
        Iterator<Map.Entry<String, SlidingWindow>> iterator = windows.entrySet().iterator();
        
        int removedCount = 0;
        while (iterator.hasNext()) {
            Map.Entry<String, SlidingWindow> entry = iterator.next();
            // 可以根据最后使用时间等策略清理
            // 这里简化实现,不实际清理
        }
        
        if (removedCount > 0) {
            log.info("滑动窗口维护完成,清理 {} 个窗口", removedCount);
        }
    }
    
    @PreDestroy
    public void destroy() {
        windowRotator.shutdown();
    }
}

5. 分布式限流实现

5.1 基于Redis的分布式限流


// Redis分布式限流器
@Component
@Slf4j
public class RedisDistributedRateLimiter {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private final String RATE_LIMIT_KEY_PREFIX = "rate_limit:";
    
    /**
     * 固定窗口限流 - 基于Redis
     */
    public boolean fixedWindowAcquire(String key, int threshold, long windowSizeMs) {
        String redisKey = RATE_LIMIT_KEY_PREFIX + key;
        long currentTime = System.currentTimeMillis();
        long windowStart = currentTime / windowSizeMs * windowSizeMs;
        String windowKey = redisKey + ":" + windowStart;
        
        // 使用Lua脚本保证原子性
        String luaScript = """
            local key = KEYS[1]
            local threshold = tonumber(ARGV[1])
            local windowSize = tonumber(ARGV[2])
            local currentTime = tonumber(ARGV[3])
            
            local count = redis.call('GET', key)
            if count == false then
                redis.call('SET', key, 1, 'PX', windowSize)
                return 1
            else
                if tonumber(count) < threshold then
                    redis.call('INCR', key)
                    return 1
                else
                    return 0
                end
            end
            """;
        
        RedisScript<Long> script = RedisScript.of(luaScript, Long.class);
        Long result = redisTemplate.execute(script, Collections.singletonList(windowKey), 
            threshold, windowSizeMs, currentTime);
        
        return result != null && result == 1;
    }
    
    /**
     * 滑动窗口限流 - 基于Redis
     */
    public boolean slidingWindowAcquire(String key, int threshold, long windowSizeMs, int numberOfWindows) {
        String redisKey = RATE_LIMIT_KEY_PREFIX + key;
        long currentTime = System.currentTimeMillis();
        long subWindowSizeMs = windowSizeMs / numberOfWindows;
        long currentSubWindowStart = currentTime / subWindowSizeMs * subWindowSizeMs;
        
        // Lua脚本实现滑动窗口
        String luaScript = """
            local key = KEYS[1]
            local threshold = tonumber(ARGV[1])
            local windowSize = tonumber(ARGV[2])
            local subWindowSize = tonumber(ARGV[3])
            local currentTime = tonumber(ARGV[4])
            local numWindows = tonumber(ARGV[5])
            
            -- 删除过期的子窗口
            local expiredBefore = currentTime - windowSize
            local subWindowStart = expiredBefore - (expiredBefore % subWindowSize)
            
            while subWindowStart < currentTime do
                local subKey = key .. ':' .. subWindowStart
                redis.call('DEL', subKey)
                subWindowStart = subWindowStart + subWindowSize
            end
            
            -- 计算当前窗口内的请求总数
            local total = 0
            local startTime = currentTime - windowSize + subWindowSize
            subWindowStart = startTime - (startTime % subWindowSize)
            
            while subWindowStart <= currentTime do
                local subKey = key .. ':' .. subWindowStart
                local count = redis.call('GET', subKey)
                if count ~= false then
                    total = total + tonumber(count)
                end
                subWindowStart = subWindowStart + subWindowSize
            end
            
            -- 检查是否超过阈值
            if total >= threshold then
                return 0
            end
            
            -- 记录当前请求
            local currentSubKey = key .. ':' .. (currentTime - (currentTime % subWindowSize))
            redis.call('INCR', currentSubKey)
            redis.call('PEXPIRE', currentSubKey, windowSize + subWindowSize)
            
            return 1
            """;
        
        RedisScript<Long> script = RedisScript.of(luaScript, Long.class);
        Long result = redisTemplate.execute(script, Collections.singletonList(redisKey), 
            threshold, windowSizeMs, subWindowSizeMs, currentTime, numberOfWindows);
        
        return result != null && result == 1;
    }
    
    /**
     * 令牌桶限流 - 基于Redis
     */
    public boolean tokenBucketAcquire(String key, int capacity, int refillTokens, long refillIntervalMs) {
        String redisKey = RATE_LIMIT_KEY_PREFIX + "token_bucket:" + key;
        long currentTime = System.currentTimeMillis();
        
        String luaScript = """
            local key = KEYS[1]
            local capacity = tonumber(ARGV[1])
            local refillTokens = tonumber(ARGV[2])
            local refillInterval = tonumber(ARGV[3])
            local currentTime = tonumber(ARGV[4])
            local tokensRequired = tonumber(ARGV[5])
            
            local bucket = redis.call('HMGET', key, 'tokens', 'lastRefillTime')
            local tokens = tonumber(bucket[1] or capacity)
            local lastRefillTime = tonumber(bucket[2] or currentTime)
            
            -- 补充令牌
            local timeSinceLastRefill = currentTime - lastRefillTime
            if timeSinceLastRefill > refillInterval then
                local refillCount = math.floor(timeSinceLastRefill / refillInterval) * refillTokens
                tokens = math.min(capacity, tokens + refillCount)
                lastRefillTime = currentTime
            end
            
            -- 检查是否有足够令牌
            if tokens < tokensRequired then
                -- 更新状态
                redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', lastRefillTime)
                redis.call('PEXPIRE', key, math.floor(capacity / refillTokens * refillInterval * 2))
                return 0
            end
            
            -- 消费令牌
            tokens = tokens - tokensRequired
            redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', lastRefillTime)
            redis.call('PEXPIRE', key, math.floor(capacity / refillTokens * refillInterval * 2))
            
            return 1
            """;
        
        RedisScript<Long> script = RedisScript.of(luaScript, Long.class);
        Long result = redisTemplate.execute(script, Collections.singletonList(redisKey), 
            capacity, refillTokens, refillIntervalMs, currentTime, 1);
        
        boolean acquired = result != null && result == 1;
        
        if (!acquired) {
            log.debug("Redis令牌桶限流: {}", key);
        }
        
        return acquired;
    }
    
    /**
     * 获取限流统计信息
     */
    public RateLimitStats getRateLimitStats(String key) {
        String redisKey = RATE_LIMIT_KEY_PREFIX + key;
        
        // 实现获取统计信息的逻辑
        return new RateLimitStats();
    }
    
    @Data
    public static class RateLimitStats {
        private String key;
        private long currentCount;
        private long threshold;
        private long resetTime;
        private double utilization; // 使用率
    }
}
// 分布式限流配置
@Configuration
public class RedisRateLimitConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        
        // 使用String序列化
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        
        template.afterPropertiesSet();
        return template;
    }
    
    @Bean
    public RedisDistributedRateLimiter redisDistributedRateLimiter() {
        return new RedisDistributedRateLimiter();
    }
}

6. 限流监控与运维

6.1 限流监控体系


// 限流监控服务
@Component
@Slf4j
public class RateLimitMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final Map<String, RateLimitMetrics> metricsMap = new ConcurrentHashMap<>();
    private final ScheduledExecutorService monitorScheduler = Executors.newScheduledThreadPool(1);
    
    /**
     * 限流指标数据
     */
    @Data
    public static class RateLimitMetrics {
        private String resource;
        private long totalRequests;
        private long limitedRequests;
        private long lastLimitedTime;
        private double limitRate; // 限流比例
        private long windowStartTime;
        private int currentQps;
        
        public void recordRequest(boolean limited) {
            totalRequests++;
            if (limited) {
                limitedRequests++;
                lastLimitedTime = System.currentTimeMillis();
            }
            
            limitRate = totalRequests > 0 ? (double) limitedRequests / totalRequests : 0.0;
        }
        
        public void resetWindow() {
            windowStartTime = System.currentTimeMillis();
            currentQps = 0;
        }
    }
    
    public RateLimitMonitor() {
        // 启动监控任务
        monitorScheduler.scheduleAtFixedRate(this::collectMetrics, 30, 30, TimeUnit.SECONDS);
    }
    
    /**
     * 记录限流事件
     */
    public void recordRateLimitEvent(String resource, boolean limited) {
        RateLimitMetrics metrics = metricsMap.computeIfAbsent(resource, 
            k -> new RateLimitMetrics());
        
        metrics.setResource(resource);
        metrics.recordRequest(limited);
        
        // 记录到监控系统
        Counter.builder("ratelimit.requests")
            .tag("resource", resource)
            .tag("limited", String.valueOf(limited))
            .register(meterRegistry)
            .increment();
    }
    
    /**
     * 收集监控指标
     */
    private void collectMetrics() {
        for (Map.Entry<String, RateLimitMetrics> entry : metricsMap.entrySet()) {
            String resource = entry.getKey();
            RateLimitMetrics metrics = entry.getValue();
            
            // 记录限流比例
            Gauge.builder("ratelimit.rate")
                .tag("resource", resource)
                .register(meterRegistry, metrics, m -> m.getLimitRate());
            
            // 检查异常情况
            checkAnomalies(resource, metrics);
        }
    }
    
    /**
     * 检查限流异常
     */
    private void checkAnomalies(String resource, RateLimitMetrics metrics) {
        // 高限流比例告警
        if (metrics.getLimitRate() > 0.1) { // 10%限流比例
            log.warn("资源 {} 限流比例过高: {:.2f}%", 
                resource, metrics.getLimitRate() * 100);
            sendAlert("限流比例告警", 
                String.format("资源 %s 限流比例过高: %.2f%%", resource, metrics.getLimitRate() * 100));
        }
        
        // 持续限警告警
        if (metrics.getLimitedRequests() > 100 && 
            System.currentTimeMillis() - metrics.getLastLimitedTime() < 60000) {
            log.warn("资源 {} 持续触发限流", resource);
            sendAlert("持续限警告警", 
                String.format("资源 %s 持续触发限流", resource));
        }
    }
    
    /**
     * 获取限流统计报告
     */
    public RateLimitReport generateReport() {
        RateLimitReport report = new RateLimitReport();
        report.setTimestamp(Instant.now());
        report.setMetrics(new HashMap<>(metricsMap));
        
        // 计算总体统计
        long totalRequests = metricsMap.values().stream()
            .mapToLong(RateLimitMetrics::getTotalRequests)
            .sum();
        long totalLimited = metricsMap.values().stream()
            .mapToLong(RateLimitMetrics::getLimitedRequests)
            .sum();
        
        report.setTotalRequests(totalRequests);
        report.setTotalLimitedRequests(totalLimited);
        report.setOverallLimitRate(totalRequests > 0 ? (double) totalLimited / totalRequests : 0.0);
        
        // 找出限流最严重的资源
        Optional<Map.Entry<String, RateLimitMetrics>> worstResource = metricsMap.entrySet().stream()
            .max(Comparator.comparingDouble(entry -> entry.getValue().getLimitRate()));
        
        worstResource.ifPresent(entry -> {
            report.setWorstResource(entry.getKey());
            report.setWorstLimitRate(entry.getValue().getLimitRate());
        });
        
        return report;
    }
    
    /**
     * 获取资源详情
     */
    public RateLimitMetrics getResourceMetrics(String resource) {
        return metricsMap.get(resource);
    }
    
    private void sendAlert(String title, String message) {
        // 发送告警通知
        log.warn("发送限流告警: {} - {}", title, message);
        
        // 集成告警系统:邮件、钉钉、企业微信等
        // alertService.sendAlert(title, message);
    }
    
    @PreDestroy
    public void destroy() {
        monitorScheduler.shutdown();
    }
    
    @Data
    public static class RateLimitReport {
        private Instant timestamp;
        private Map<String, RateLimitMetrics> metrics;
        private long totalRequests;
        private long totalLimitedRequests;
        private double overallLimitRate;
        private String worstResource;
        private double worstLimitRate;
        private List<String> recommendations;
    }
}
// 限流监控控制器
@RestController
@RequestMapping("/monitor/ratelimit")
@Slf4j
public class RateLimitMonitorController {
    
    @Autowired
    private RateLimitMonitor rateLimitMonitor;
    
    /**
     * 获取限流概览
     */
    @GetMapping("/overview")
    public ResponseEntity<RateLimitMonitor.RateLimitReport> getOverview() {
        RateLimitMonitor.RateLimitReport report = rateLimitMonitor.generateReport();
        return ResponseEntity.ok(report);
    }
    
    /**
     * 获取资源详情
     */
    @GetMapping("/resources/{resource}")
    public ResponseEntity<RateLimitMonitor.RateLimitMetrics> getResourceMetrics(
            @PathVariable String resource) {
        
        RateLimitMonitor.RateLimitMetrics metrics = rateLimitMonitor.getResourceMetrics(resource);
        if (metrics == null) {
            return ResponseEntity.notFound().build();
        }
        
        return ResponseEntity.ok(metrics);
    }
    
    /**
     * 手动触发限流规则调整
     */
    @PostMapping("/resources/{resource}/adjust")
    public ResponseEntity<Map<String, Object>> adjustRateLimit(
            @PathVariable String resource,
            @RequestParam int newThreshold) {
        
        log.info("手动调整限流阈值: {} -> {}", resource, newThreshold);
        
        // 调用动态限流管理器调整阈值
        // dynamicRateLimitManager.updateThreshold(resource, newThreshold);
        
        Map<String, Object> result = new HashMap<>();
        result.put("resource", resource);
        result.put("newThreshold", newThreshold);
        result.put("timestamp", Instant.now());
        result.put("status", "success");
        
        return ResponseEntity.ok(result);
    }
}
// 限流健康检查
@Component
public class RateLimitHealthIndicator implements HealthIndicator {
    
    @Autowired
    private RateLimitMonitor rateLimitMonitor;
    
    @Override
    public Health health() {
        try {
            RateLimitMonitor.RateLimitReport report = rateLimitMonitor.generateReport();
            
            // 检查限流健康状态
            if (report.getOverallLimitRate() > 0.3) {
                return Health.down()
                    .withDetail("message", "限流比例过高")
                    .withDetail("limitRate", report.getOverallLimitRate())
                    .withDetail("worstResource", report.getWorstResource())
                    .build();
            } else if (report.getOverallLimitRate() > 0.1) {
                return Health.up()
                    .withDetail("message", "限流比例正常")
                    .withDetail("limitRate", report.getOverallLimitRate())
                    .withDetail("totalRequests", report.getTotalRequests())
                    .build();
            } else {
                return Health.up()
                    .withDetail("message", "系统运行良好")
                    .withDetail("limitRate", report.getOverallLimitRate())
                    .build();
            }
            
        } catch (Exception e) {
            return Health.down(e)
                .withDetail("message", "限流监控异常")
                .build();
        }
    }
}

总结

限流是微服务架构中保护系统稳定性的关键技术。通过本文的实战指南,我们掌握了:

核心限流算法

  1. 固定窗口计数器:简单高效,但有临界问题
  2. 滑动窗口计数器:更精确的控制,解决临界问题
  3. 漏桶算法:恒定速率处理,适合流量整形
  4. 令牌桶算法:允许突发流量,适用大部分场景

技术实现层次

  • 应用层限流:Sentinel注解、自定义限流器
  • 中间件限流:Redis分布式限流
  • 网关层限流:统一入口控制

生产最佳实践

  • 建立多级限流防御体系
  • 实施动态限流阈值调整
  • 建立完善的监控告警机制
  • 设计优雅的限流响应策略

限流不是简单的技术开关,而是需要根据业务特点、系统容量和用户体验进行精细化设计的系统工程。正确的限流实践能够为微服务架构提供坚实的稳定性保障。

相关文章
|
2月前
|
缓存 监控 Java
《服务治理》流量治理:服务降级详解与实践
服务降级是在系统压力下通过关闭非核心功能或简化流程,保障核心业务可用性的容错策略。本文详解其与熔断的区别、分类(主动/自动、功能/数据/流程)、多级策略设计及Resilience4j实战,并强调监控、演练与智能决策的重要性,助力提升系统稳定性与高可用能力。
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
326 3
|
2月前
|
人工智能 Java Nacos
基于 Spring AI Alibaba + Nacos 的分布式 Multi-Agent 构建指南
本文将针对 Spring AI Alibaba + Nacos 的分布式多智能体构建方案展开介绍,同时结合 Demo 说明快速开发方法与实际效果。
1942 60
|
存储 数据挖掘 数据库
【Python】python天气数据抓取与数据分析(源码+论文)【独一无二】
【Python】python天气数据抓取与数据分析(源码+论文)【独一无二】
|
2月前
|
人工智能 小程序 前端开发
一个小程序轻量AR体感游戏,开发实现解决方案
针对青少年运动兴趣不足问题,AR体感游戏凭借沉浸式互动体验脱颖而出。结合小程序“AI运动识别”插件与WebGL渲染技术,可实现无需外设的轻量化AR健身游戏,如跳糕、切水果等,兼具趣味性与锻炼效果,适用于儿童健身及职工团建,即开即玩,低门槛高参与。
|
4月前
|
存储 SQL 关系型数据库
RDS DuckDB技术解析一:当 MySQL遇见列式存储引擎
RDS MySQL DuckDB分析实例以​列式存储与向量化计算​为核心,实现​复杂分析查询性能百倍跃升​,为企业在海量数据规模场景下提供​实时分析能力​,加速企业数据驱动型决策效能。​​
|
安全
工业机理模型的构建
工业机理模型的构建
386 7
|
安全
工业机理模型是一种基于设备和产品的仿真和原理化的分析模型
工业机理模型是一种基于设备和产品的仿真和原理化的分析模型
688 6
|
8月前
|
JavaScript 算法 前端开发
解决若依框架中 npm run dev 卡在 95% 的问题
本文深入探讨若依框架中 `npm run dev` 卡在 95% 的问题,分析其与 Node.js 17+ 内置 OpenSSL 3.0 加密策略变更的关系。提供临时(设置环境变量 `NODE_OPTIONS=--openssl-legacy-provider`)和永久(修改 `package.json` 脚本)解决方案,同时建议降级 Node.js 或更新依赖以根本解决兼容性问题。最后强调依赖管理与开发环境标准化的重要性,助力团队高效开发。
803 1