开发者社区> lpxxn> 正文

玩转redis-延时消息队列

简介:
+关注继续查看

上一篇基于redis的list实现了一个简单的消息队列:玩转redis-简单消息队列

源码地址 使用demo

产品经理经常说的一句话,我们不光要有X功能,还要Y功能,这样客户才能更满意。同样的,只有简单消息队列是不够的,还要有延时消息队列才能算是一个完整的消息队列。

看看redis的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个延时消息队列

redis有序集合(sorted set)

redis有序集合,每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。
有序集合的成员是唯一的,但分数(score)却可以重复。

简单操作

添加数据

127.0.0.1:6379> ZADD testSet1 5 a
(integer) 1
127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
(integer) 3

读取

127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
1) "b"
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
1) "b"
2) "a"

也可以把score打出来

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES
1) "b"
2) "1"
3) "a"
4) "5"

查出所有的数据

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
1) "b"
2) "a"
3) "d"
4) "c"

删除数据

ZREMRANGEBYSCORE testSet1 0 2

延时队列的实现思路

总体的思路很简单,就是每一个valuescore保存的是时间,也就是说,在添加一个元素时他的score是当前时间+延时的时间。轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。

还有一个问题,就是ZRANGEBYSCORElistpop不同,pop是取出元素并且会把元素在list中删除。ZRANGEBYSCORE只会取出数据不会把数据从sorted set中删除。解决方法1,利用redis事务,先ZRANGEBYSCORE取出数据,然后再用ZREMRANGEBYSCORE 把数据删除。

具体实现-code

添加延时消息,参数delay就是我们要延时多久:

func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
    if delay <= 0 {
        return errors.New("delay need great than zero")
    }
    tm := time.Now().Add(delay)
    msg := NewMessage("", body)
    msg.DelayTime = tm.Unix()

    sendData, _ := json.Marshal(msg)
    return p.redisCmd.ZAdd(topicName+zsetSuffix, redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err()
}

使用,比如我们想过1秒再处理

producer.PublishDelayMsg(topicName, body, time.Second)

读取消息并处理
这就比较简单了,就是在一个ticker里循环读取小于或等于当前时间的数据:

func (s *consumer) startGetDelayMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get delay message.")
            ticker.Stop()
        }()
        topicName := s.topicName + zsetSuffix
        for {
            currentTime := time.Now().Unix()
            select {
            case <-s.ctx.Done():
                log.Printf("context Done msg: %#v \n", s.ctx.Err())
                return
            case <-ticker.C:
                var valuesCmd *redis.ZSliceCmd
                _, err := s.redisCmd.TxPipelined(func(pip redis.Pipeliner) error {
                    valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime)
                    pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10))
                    return nil
                })
                if err != nil {
                    log.Printf("zset pip error: %#v \n", err)
                    continue
                }
                rev := valuesCmd.Val()
                for _, revBody := range rev {
                    msg := &Message{}
                    json.Unmarshal([]byte(revBody.Member.(string)), msg)
                    if s.handler != nil {
                        s.handler.HandleMessage(msg)
                    }
                }
            }
        }
    }()
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
探索FreeRTOS的功能:线程,消息队列,邮箱,信号量,互斥量,任务通知,延时,虚拟定时器
探索FreeRTOS的功能:线程,消息队列,邮箱,信号量,互斥量,任务通知,延时,虚拟定时器
155 0
redis灵魂拷问:如何使用stream实现消息队列
redis灵魂拷问:如何使用stream实现消息队列
210 0
redis 实现消息队列及常用命令(三)|学习笔记
快速学习 redis 实现消息队列及常用命令(三)
118 0
基于Redis实现消息队列
基于Redis实现消息队列
181 0
【Redis】浅尝Redis Stream做消息队列
SpringBoot整合Redis5.0新特性Redis Stream
404 0
SpringBoot整合Redis实现消息队列
SpringBoot整合Redis实现消息队列
192 0
别再用 Redis List 实现消息队列了,Stream 专为队列而生
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。
242 0
Redis 竟然能用 List 实现消息队列
今天,码哥结合消息队列的特点一步步带大家分析使用 Redis 的 List 作为消息队列的实现原理,并分享如何把 SpringBoot 与 Redission 整合运用到项目中。
295 0
Redis和消息队列使用实战
 消息队列是在乐视这边非常普遍使用的技术。在我们部门内部,不同的项目使用的消息队列实现也不一样。下面是支付系统的流转图
104 0
AspNetCore结合Redis实践消息队列
这是年中首发在博客园上的文章,个人觉得是AspNetCore结合Redis做的一次比较优秀的消息队列重构,其中对于点对点/发布-订阅的思路应该也是面试必考题。
240 0
+关注
lpxxn
山外青山,楼外楼。
文章
问答
文章排行榜
最热
最新
相关电子书
更多
消息队列 Kafka 版差异化特性
立即下载
消息队列kafka介绍
立即下载
企业互联网架构之消息队列
立即下载