玩转redis-延时消息队列

简介:

上一篇基于redis的list实现了一个简单的消息队列:玩转redis-简单消息队列

源码地址 使用demo

产品经理经常说的一句话,我们不光要有X功能,还要Y功能,这样客户才能更满意。同样的,只有简单消息队列是不够的,还要有延时消息队列才能算是一个完整的消息队列。

看看redis的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个延时消息队列

redis有序集合(sorted set)

redis有序集合,每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。
有序集合的成员是唯一的,但分数(score)却可以重复。

简单操作

添加数据

127.0.0.1:6379> ZADD testSet1 5 a
(integer) 1
127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
(integer) 3

读取

127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
1) "b"
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
1) "b"
2) "a"

也可以把score打出来

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES
1) "b"
2) "1"
3) "a"
4) "5"

查出所有的数据

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
1) "b"
2) "a"
3) "d"
4) "c"

删除数据

ZREMRANGEBYSCORE testSet1 0 2

延时队列的实现思路

总体的思路很简单,就是每一个valuescore保存的是时间,也就是说,在添加一个元素时他的score是当前时间+延时的时间。轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。

还有一个问题,就是ZRANGEBYSCORElistpop不同,pop是取出元素并且会把元素在list中删除。ZRANGEBYSCORE只会取出数据不会把数据从sorted set中删除。解决方法1,利用redis事务,先ZRANGEBYSCORE取出数据,然后再用ZREMRANGEBYSCORE 把数据删除。

具体实现-code

添加延时消息,参数delay就是我们要延时多久:

func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
    if delay <= 0 {
        return errors.New("delay need great than zero")
    }
    tm := time.Now().Add(delay)
    msg := NewMessage("", body)
    msg.DelayTime = tm.Unix()

    sendData, _ := json.Marshal(msg)
    return p.redisCmd.ZAdd(topicName+zsetSuffix, redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err()
}

使用,比如我们想过1秒再处理

producer.PublishDelayMsg(topicName, body, time.Second)

读取消息并处理
这就比较简单了,就是在一个ticker里循环读取小于或等于当前时间的数据:

func (s *consumer) startGetDelayMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get delay message.")
            ticker.Stop()
        }()
        topicName := s.topicName + zsetSuffix
        for {
            currentTime := time.Now().Unix()
            select {
            case <-s.ctx.Done():
                log.Printf("context Done msg: %#v \n", s.ctx.Err())
                return
            case <-ticker.C:
                var valuesCmd *redis.ZSliceCmd
                _, err := s.redisCmd.TxPipelined(func(pip redis.Pipeliner) error {
                    valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime)
                    pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10))
                    return nil
                })
                if err != nil {
                    log.Printf("zset pip error: %#v \n", err)
                    continue
                }
                rev := valuesCmd.Val()
                for _, revBody := range rev {
                    msg := &Message{}
                    json.Unmarshal([]byte(revBody.Member.(string)), msg)
                    if s.handler != nil {
                        s.handler.HandleMessage(msg)
                    }
                }
            }
        }
    }()
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
85 6
|
4月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
5月前
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
261 3
|
2月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
55 6
|
4月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
109 20
剖析 Redis List 消息队列的三种消费线程模型
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
32 2
|
3月前
|
消息中间件 存储 NoSQL
如何用Redis实现延迟队列?
综上所述,通过Redis的有序集合和一些基本命令,我们可以轻松地构建出功能完善的延迟队列系统。根据具体需求,可以进一步优化和扩展,以满足高性能和高可靠性的业务需求。
69 1
|
3月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
53 0
|
5月前
|
消息中间件 监控 UED
【揭秘消息队列背后的秘密!】如何解决消息队列的延时及过期失效问题?深入剖析与实战指南!
【8月更文挑战第24天】本文以随笔形式探讨了消息队列在实际应用中面临的消息延时及过期失效问题。针对消息延时,文章提出了包括优化消息队列配置、提高消费者效率和利用优先级队列在内的解决方案;并通过示例代码展示了如何优化RabbitMQ中的消费者处理流程。对于消息过期失效问题,则建议设置消息TTL、采用死信队列并实施监控报警机制;同样提供了基于RabbitMQ设置消息TTL的具体实现。这些策略有助于提升消息队列的性能和系统的整体稳定性。
70 2
|
4月前
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。