redis事件监听的应用场景与案例实战

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: redis事件监听的应用场景与案例实战

什么是事件监听

在使用Redis的过程中,我们对Redis做的每一个操作,下发的每一个命令, 都可以认为是事件的存在。所谓事件监听,就是Redis Server会对客户端下发命令进行一个监控, 一但有人对Redis Server做操作, Redis Server都能知道,并通过某种方式将监听到的事件转发到对应的订阅者。


应用场景

需求一:

一个电商商家后台,商家可以设置多个商品的价格并指定价格的生效时间。后台服务需要生效时间到时对所有已经上架的商品进行价格修改。并在价格修改成功后通知所有关注该商品的买家客户。


注意: 假设该平台拥有1w商家,平均每个商家设置了100个商品,也就是你要保证200w件商品价格修改的实时通知性。


解决方案一: 每个商品都有一份表去记录所有的新价格和生效时间,由定时任务job去轮询表中的数据,如果符合当前时间则取出并执行接下来的业务逻辑。

170862c89436907b~tplv-t2oaga2asx-zoom-in-crop-mark_4536_0_0_0.png

解决方案二: 每个商品都有一份表去记录所有的新价格和生效时间,由多个分布式job去轮询表中的数据,为了减轻job服务实例的压力,设置每2秒执行一次(定时任务不建议设置每秒)。在这基础上其实还有优化的空间,可以在设置分布式job分片处理逻辑。对于每一个job实例,还可以在其内部开启异步线程并行处理。

170863342cffa94c~tplv-t2oaga2asx-zoom-in-crop-mark_4536_0_0_0.png

从上述的描述中我们可以发现,用户量还是比较大,其实实时性要求比较高,所以如果我们把数据落库,然后每次定时的时候从数据库里面去取然后做逻辑的判断,这样肯定是无法达到实时性的要求的,所以有一种方案是采用redis来管理这批数据。但是也有两个个问题

  1. 当这批数据过期的时候,要提醒用户
  2. 从redis删除后,要修改数据库的状态。

要解决这个功能就需要使用到redis的一个高级的功能:redis 键空间通知(供Keyspace Notifications功能)其允许客户Publish / Subscribe ,以便以某种方式接收影响Redis数据集的事件。


需求二:

同样是电商平台,商家可以设置商品的预售时间, 当预售时间到达时,修改商品状态,并上架商品。该需求和需求一类似,都是以时间或者秒作为计算依据,每个商品都是独立的,它们的时间属性都不会一样,所以是没有规律性的。


需求三:

订单超时30分钟自动关闭。(不管多少订单,都是固定的时间间隔30分钟,有规律)

这个问题解决的方案就有多种了,我们可以通过MQ来进行,现在大多的MQ都带有死信队列的机制,我们可以通过这个机制来完成,其次也可以通过quartz的轮询方式的完成,选择合适解决方案应对当前的需求即可。当然本次主要是解决第一个需求,所以只谈如何使用redis来解决。


需求四:

  • 监控key的操作(set、del、expire……)
  • 监听key的过期,自动触发事件


如何使用Keyspace Notifications

由于Keyspace Notifications是在Redis 2.8.0之后的版本才提供的功能,所以我们的Redis版本需要再2.8.0之上,否则无法使用Redis时间监听,在笔者写这篇文章之时,Redis的最新正式版本已经为5.0


修改Redis配置,开启Keyspace Notifications的两种方式

  • 命令修改

CONFIG set notify-keyspace-events AKEx

  • 配置文件修改
    修改配置文件redis.conf, notify-keyspace-events AKEx,重新启动Redis


参数说明

1)notify-keyspace-events选项的参数为空字符串时,表示功能关闭,当参数不是空字符串时,表示功能开启      

2)notify-keyspace-events默功能是关闭的            

3)如果要使用此功能,必须字符串包含 K 或者 E,否则收不到任何事件消息          

4)如果参数为“AKE”,意味着接收所有事件消息          

notify-keyspace-events 的参数可以是以下字符的任意组合, 它指定了服务器该发送哪些类型的通知:          

字符 发送的通知
K 键空间通知,所有通知以 keyspace@ 为前缀
E 键事件通知,所有通知以 keyevent@ 为前缀
g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
字符串命令的通知
l 列表命令的通知
s 集合命令的通知
h 哈希命令的通知
z 有序集合命令的通知
x 过期事件:每当有过期键被删除时发送
e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
A 参数 glshzxe(evict)maxmemoryAglshzxe 的别名


实例演示

同时监听 set、get、del 、 expire 操作

170860359e940b7c~tplv-t2oaga2asx-zoom-in-crop-mark_4536_0_0_0.gif

注意:get 操作监听不到消息,set,del ,expire 如果操作成功可以监听到消息,如果操作失败也监听不到消息.

更多命令参考

# 以keyevent订阅库0上的set、get、del、expire多个事件
subscribe __keyevent@0__:set __keyevent@0__:get __keyevent@0__:del __keyevent@0__:expire
# 以keyspace订阅库0上关于key为mykey的所有事件
subscribe __keyspace@0__:mykey

模式匹配则使用psubscribe

# 以keyspace订阅库0上关于key为mykey:*的所有事件
psubscribe __keyspace@0__:mykey:*
# 以keyevent、keyspace订阅所有库上的所有事件
psubscribe __key*@*__:*

程序实战

使用技术Spring Boot + RedisTemplate

  • Redis监听类 RedisExpiredListener
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class RedisExpiredListener implements MessageListener {
    public final static String LISTENER_PATTERN = "__key*@*__:*";
    @Override
    public void onMessage(Message message, byte[] bytes) {
        // 建议使用: valueSerializer
        String body = new String(message.getBody());
        String channel = new String(message.getChannel());
        System.out.println("onMessage >> " + String.format("channel: %s, body: %s, bytes: %s"
                , channel, body, new String(bytes)));
        if (body.startsWith("product:")) {
            final String productId = body.replace("product:", "");
            System.out.println("得到产品id:" + productId);
        }
    }
}

启动类 RedisExpiredApplication

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
public class RedisExpiredApplication implements CommandLineRunner {
    @Autowired
    private RedisTemplate redisTemplate;
    public static void main(String[] args) {
        SpringApplication.run(RedisExpiredApplication.class, args);
    }
    @Bean
    @Primary
    public RedisTemplate redisTemplate() {
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);
        redisTemplate.setValueSerializer(stringSerializer);
        redisTemplate.setHashKeySerializer(stringSerializer);
        redisTemplate.setHashValueSerializer(stringSerializer);
        return redisTemplate;
    }
    @Bean
    public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection, Executor executor) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置Redis的连接工厂
        container.setConnectionFactory(redisConnection);
        // 设置监听使用的线程池
        container.setTaskExecutor(executor);
        // 设置监听的Topic: PatternTopic/ChannelTopic
        Topic topic = new PatternTopic(RedisExpiredListener.LISTENER_PATTERN);
        // 设置监听器
        container.addMessageListener(new RedisExpiredListener(), topic);
        return container;
    }
    @Bean
    public Executor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("V-Thread");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    @Override
    public void run(String... strings) throws Exception {
        redisTemplate.opsForValue().set("orderId:123", "过期了是取不到的", 5, TimeUnit.SECONDS);
        System.out.println("初始化设置 key 过期时间 5s");
        System.out.println("main 线程休眠10秒");
        Thread.sleep(10 * 1000);
        System.out.println("main 线程休眠结束:获取key orderId结果为:" + redisTemplate.opsForValue().get("orderId:123"));
    }
  • 配置文件:application.properties
spring.redis.database=0
spring.redis.host=192.168.104.102
spring.redis.port=6378
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1

效果展示:

170861edf2c8e106~tplv-t2oaga2asx-zoom-in-crop-mark_4536_0_0_0.gif

因为redis key 过期之后,其中的value是无法获取到的, 所以在设计key的时候就包含了业务主键id在其中,以此来解决value消失无法处理业务逻辑的情况。到这里,就可以根据具体到期时间执行具体逻辑了。

Redis过期命令设置

# Redis Expire 命令用于设置 key 的过期时间。key 过期后将不再可用。
Expire KEY_NAME TIME_IN_SECONDS
# Redis Expireat 命令用于以 UNIX 时间戳(unix timestamp)格式设置 key 的过期时间。key 过期后将不再可用。
Expireat KEY_NAME TIME_IN_UNIX_TIMESTAMP
# Redis PEXPIREAT 命令用于设置 key 的过期时间,已毫秒计。key 过期后将不再可用。
PEXPIREAT KEY_NAME TIME_IN_MILLISECONDS_IN_UNIX_TIMESTAMP

注意事项

因为 Redis 目前的订阅与发布功能采取的是 发送即忘(fire and forget) 策略, 所以如果你的程序需要可靠事件通知(reliable notification of events), 那么目前的键空间通知可能并不适合你:当订阅事件的客户端(服务实例)断线时, 它会丢失所有在断线期间分发给它的事件。并不能确保消息送达。未来有计划允许更可靠的事件传递,但可能这将在更基础的层面上解决,或者为Pub / Sub本身带来可靠性,或者允许Lua脚本拦截Pub / Sub消息来执行诸如推送将事件列入清单。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
目录
相关文章
|
6月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
548 0
分布式爬虫框架Scrapy-Redis实战指南
|
3月前
|
缓存 监控 NoSQL
Redis 实操要点:Java 最新技术栈的实战解析
本文介绍了基于Spring Boot 3、Redis 7和Lettuce客户端的Redis高级应用实践。内容包括:1)现代Java项目集成Redis的配置方法;2)使用Redisson实现分布式可重入锁与公平锁;3)缓存模式解决方案,包括布隆过滤器防穿透和随机过期时间防雪崩;4)Redis数据结构的高级应用,如HyperLogLog统计UV和GeoHash处理地理位置。文章提供了详细的代码示例,涵盖Redis在分布式系统中的核心应用场景,特别适合需要处理高并发、分布式锁等问题的开发场景。
222 39
|
2月前
|
NoSQL Java Redis
Redis基本数据类型及Spring Data Redis应用
Redis 是开源高性能键值对数据库,支持 String、Hash、List、Set、Sorted Set 等数据结构,适用于缓存、消息队列、排行榜等场景。具备高性能、原子操作及丰富功能,是分布式系统核心组件。
286 2
|
3月前
|
NoSQL 网络协议 Java
【Azure Redis】Redis服务端的故障转移(Failover)导致客户端应用出现15分钟超时问题的模拟及解决
在使用 Azure Cache for Redis 服务时,因服务端维护可能触发故障转移。Linux 环境下使用 Lettuce SDK 会遇到超时 15 分钟的已知问题。本文介绍如何通过重启 Primary 节点主动复现故障转移,并提供多种解决方案,包括调整 TCP 设置、升级 Lettuce 版本、配置 TCP_USER_TIMEOUT 及使用其他 SDK(如 Jedis)来规避此问题。
105 1
|
3月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
828 7
|
4月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
288 3
|
4月前
|
存储 NoSQL Java
从扣减库存场景来讲讲redis分布式锁中的那些“坑”
本文从一个简单的库存扣减场景出发,深入分析了高并发下的超卖问题,并逐步优化解决方案。首先通过本地锁解决单机并发问题,但集群环境下失效;接着引入Redis分布式锁,利用SETNX命令实现加锁,但仍存在死锁、锁过期等隐患。文章详细探讨了通过设置唯一标识、续命机制等方法完善锁的可靠性,并最终引出Redisson工具,其内置的锁续命和原子性操作极大简化了分布式锁的实现。最后,作者剖析了Redisson源码,揭示其实现原理,并预告后续关于主从架构下分布式锁的应用与性能优化内容。
194 0
|
缓存 NoSQL 数据库
【redis】redis应用场景,缓存的各种问题
redis有一个重要的应用领域——缓存 引用来自网友的图解释缓存在架构中的位置 默认情况下,我们的服务架构如下图,客户端请求service,然后service去读取mysql数据库 问题存在于,数据库性能不够用,数据库是整个架构中最重要的一个环节,它在高并发,高写入频次的时候非常容易崩掉,这是一般的数据库本身的特性所决定的,它们的架构模式注定了不可以承受较大的并发量,所以就有了缓存: service与高速的缓存进行交互,如果缓存中有数据直接返回客户端,如果没有才会从MySql中去查询。
17446 0
|
4月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
4月前
|
缓存 NoSQL Java
Redis+Caffeine构建高性能二级缓存
大家好,我是摘星。今天为大家带来的是Redis+Caffeine构建高性能二级缓存,废话不多说直接开始~
573 0