分布式限流之Redis+Lua实现

简介: 分布式限流最关键的是要将限流服务做成原子化,而解决方案可以使用redis+lua或者nginx+lua技术进行实现,通过这两种技术可以实现的高并发和高性能。本篇我们来使用redis+lua实现时间窗内某个接口的请求数限流。

【转载请注明出处】:https://developer.aliyun.com/article/758604

分布式限流最关键的是要将限流服务做成原子化,而解决方案可以使用redis+lua或者nginx+lua技术进行实现,通过这两种技术可以实现的高并发和高性能。

首先我们来使用redis+lua实现时间窗内某个接口的请求数限流,实现了该功能后可以改造为限流总并发/请求数和限制总资源数。Lua本身就是一种编程语言,也可以使用它实现复杂的令牌桶或漏桶算法。
因操作是在一个lua脚本中(相当于原子操作),又因Redis是单线程模型,因此是线程安全的。

相比Redis事务来说,Lua脚本有以下优点

  • 减少网络开销: 不使用 Lua 的代码需要向 Redis 发送多次请求,而脚本只需一次即可,减少网络传输;
  • 原子操作:Redis 将整个脚本作为一个原子执行,无需担心并发,也就无需事务;
  • 复用:脚本会永久保存 Redis 中,其他客户端可继续使用。

下面使用SpringBoot项目来进行介绍。

准备Lua 脚本

req_ratelimit.lua

local key = "req.rate.limit:" .. KEYS[1]   --限流KEY
local limitCount = tonumber(ARGV[1])       --限流大小
local limitTime = tonumber(ARGV[2])        --限流时间
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limitCount then --如果超出限流大小
    return 0
else  --请求数+1,并设置1秒过期
    redis.call("INCRBY", key,"1")
    redis.call("expire", key,limitTime)
    return current + 1
end
  • 我们通过KEYS[1] 获取传入的key参数
  • 通过ARGV[1]获取传入的limit参数
  • redis.call方法,从缓存中get和key相关的值,如果为nil那么就返回0
  • 接着判断缓存中记录的数值是否会大于限制大小,如果超出表示该被限流,返回0
  • 如果未超过,那么该key的缓存值+1,并设置过期时间为1秒钟以后,并返回缓存值+1

准备Java项目

pom.xml加入
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

Redis 配置

spring.redis.host=127.0.0.1 
spring.redis.port=6379 
spring.redis.password=
spring.redis.database=0
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=2000
限流注解

注解的目的,是在需要限流的方法上使用

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimiter {

    /**
     * 限流唯一标识
     * @return
     */
    String key() default "";

    /**
     * 限流时间
     * @return
     */
    int time();

    /**
     * 限流次数
     * @return
     */
    int count();

}
lua文件配置及RedisTemplate配置
@Aspect
@Configuration
@Slf4j
public class RateLimiterAspect {


    @Autowired
    private RedisTemplate<String, Serializable> redisTemplate;

    @Autowired
    private DefaultRedisScript<Number> redisScript;

    @Around("execution(* com.sunlands.zlcx.datafix.web ..*(..) )")
    public Object interceptor(ProceedingJoinPoint joinPoint) throws Throwable {

        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        Class<?> targetClass = method.getDeclaringClass();
        RateLimiter rateLimit = method.getAnnotation(RateLimiter.class);

        if (rateLimit != null) {
            HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
            String ipAddress = getIpAddr(request);

            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append(ipAddress).append("-")
                    .append(targetClass.getName()).append("- ")
                    .append(method.getName()).append("-")
                    .append(rateLimit.key());

            List<String> keys = Collections.singletonList(stringBuffer.toString());

            Number number = redisTemplate.execute(redisScript, keys, rateLimit.count(), rateLimit.time());

            if (number != null && number.intValue() != 0 && number.intValue() <= rateLimit.count()) {
                log.info("限流时间段内访问第:{} 次", number.toString());
                return joinPoint.proceed();
            }

        } else {
            return joinPoint.proceed();
        }

        throw new RuntimeException("已经到设置限流次数");
    }

    public static String getIpAddr(HttpServletRequest request) {
        String ipAddress = null;
        try {
            ipAddress = request.getHeader("x-forwarded-for");
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("WL-Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getRemoteAddr();
            }
            // 对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
            if (ipAddress != null && ipAddress.length() > 15) {
                // "***.***.***.***".length()= 15
                if (ipAddress.indexOf(",") > 0) {
                    ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
                }
            }
        } catch (Exception e) {
            ipAddress = "";
        }
        return ipAddress;
    }


}
控制层
@RestController
@Slf4j
@RequestMapping("limit")
public class RateLimiterController {

    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping(value = "/test")
    @RateLimiter(key = "test", time = 10, count = 1)
    public ResponseEntity<Object> test() {

        String date = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
        RedisAtomicInteger limitCounter = new RedisAtomicInteger("limitCounter", redisTemplate.getConnectionFactory());
        String str = date + " 累计访问次数:" + limitCounter.getAndIncrement();
        log.info(str);
        return ResponseEntity.ok(str);
    }
}

启动项目进行测试

不断访问url http://127.0.0.1:8090/limit/test,效果如下:
image.png

image.png

我这里为了简单演示是直接抛了一个RuntimeException,实际可以单独定义一个如RateLimitException,在上层直接处理这种频次限制的异常,以友好的方式返回给用户。

【转载请注明出处】:https://developer.aliyun.com/article/758604

相关文章
|
9月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
577 2
|
9月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
670 6
|
10月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
8月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
665 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
8月前
|
缓存 NoSQL 关系型数据库
Redis缓存和分布式锁
Redis 是一种高性能的键值存储系统,广泛用于缓存、消息队列和内存数据库。其典型应用包括缓解关系型数据库压力,通过缓存热点数据提高查询效率,支持高并发访问。此外,Redis 还可用于实现分布式锁,解决分布式系统中的资源竞争问题。文章还探讨了缓存的更新策略、缓存穿透与雪崩的解决方案,以及 Redlock 算法等关键技术。
|
10月前
|
NoSQL Redis
Lua脚本协助Redis分布式锁实现命令的原子性
利用Lua脚本确保Redis操作的原子性是分布式锁安全性的关键所在,可以大幅减少由于网络分区、客户端故障等导致的锁无法正确释放的情况,从而在分布式系统中保证数据操作的安全性和一致性。在将这些概念应用于生产环境前,建议深入理解Redis事务与Lua脚本的工作原理以及分布式锁的可能问题和解决方案。
343 8
|
11月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2972 7
|
12月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
892 3
|
12月前
|
存储 NoSQL Java
从扣减库存场景来讲讲redis分布式锁中的那些“坑”
本文从一个简单的库存扣减场景出发,深入分析了高并发下的超卖问题,并逐步优化解决方案。首先通过本地锁解决单机并发问题,但集群环境下失效;接着引入Redis分布式锁,利用SETNX命令实现加锁,但仍存在死锁、锁过期等隐患。文章详细探讨了通过设置唯一标识、续命机制等方法完善锁的可靠性,并最终引出Redisson工具,其内置的锁续命和原子性操作极大简化了分布式锁的实现。最后,作者剖析了Redisson源码,揭示其实现原理,并预告后续关于主从架构下分布式锁的应用与性能优化内容。
505 0
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
1610 0
分布式爬虫框架Scrapy-Redis实战指南