玩转redis-简单消息队列

简介:

使用go语言基于redis写了一个简单的消息队列
源码地址
使用demo

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

添加数据和获取数据的操作也是非常简单的
LPUSH 从左边插入数据
RPUSH 大右边插入数据
LPOP 从左边取出一个数据
RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,
如果这期间有数据写入,则会读取并返回,没有数据则会返回空
在一个窗口1读取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一个窗口2写入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再开一个窗口3读取,第二次读取时,list是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个consumer消费掉?

当然不会,因为redis是单线程的,在从list取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用list实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用LPOP或者BLPOP都可以,
这里做了一个开关 optionsUseBLopp如果为true时会使用BLPOP

type consumer struct {
    once            sync.Once
    redisCmd        redis.Cmdable
    ctx             context.Context
    topicName       string
    handler         Handler
    rateLimitPeriod time.Duration
    options         ConsumerOptions
    _               struct{}
}

type ConsumerOptions struct {
    RateLimitPeriod time.Duration
    UseBLPop        bool
}

看一下创建consumer的代码,最后面的opts参数是可选的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
    consumer := &consumer{
        redisCmd:  redisCmd,
        ctx:       ctx,
        topicName: topicName,
    }
    for _, o := range opts {
        o(&consumer.options)
    }
    if consumer.options.RateLimitPeriod == 0 {
        consumer.options.RateLimitPeriod = time.Microsecond * 200
    }
    return consumer
}

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理
有一个小的interface调用者根据自己的逻辑去实现

type Handler interface {
    HandleMessage(msg *Message)
}

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get message.")
            ticker.Stop()
        }()
        for {
            select {
            case <-s.ctx.Done():
                log.Printf("context Done msg: %#v \n", s.ctx.Err())
                return
            case <-ticker.C:
                var revBody []byte
                var err error
                if !s.options.UseBLPop {
                    revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
                } else {
                    revs := s.redisCmd.BLPop(time.Second, s.topicName)
                    err = revs.Err()
                    revValues := revs.Val()
                    if len(revValues) >= 2 {
                        revBody = []byte(revValues[1])
                    }
                }
                if err == redis.Nil {
                    continue
                }
                if err != nil {
                    log.Printf("LPOP error: %#v \n", err)
                    continue
                }

                if len(revBody) == 0 {
                    continue
                }
                msg := &Message{}
                json.Unmarshal(revBody, msg)
                if s.handler != nil {
                    s.handler.HandleMessage(msg)
                }
            }
        }
    }()
}

Producer 的实现

Producer还是很简单的就是把数据推送到 reids

type Producer struct {
    redisCmd redis.Cmdable
    _        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
    return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
    msg := NewMessage("", body)
    sendData, _ := json.Marshal(msg)
    return p.redisCmd.RPush(topicName, string(sendData)).Err()
}
目录
相关文章
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
337 6
|
消息中间件 存储 负载均衡
Redis使用ZSET实现消息队列使用总结二
Redis使用ZSET实现消息队列使用总结二
298 0
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
剖析 Redis List 消息队列的三种消费线程模型
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
234 2
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
338 0
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
785 0
|
消息中间件 存储 NoSQL
Redis使用ZSET实现消息队列使用总结一
Redis使用ZSET实现消息队列使用总结一
485 0
|
消息中间件 NoSQL Redis
Redis 消息队列介绍
Redis的消息队列使用简单,没有什么配置,比ActiveMQ要轻量级太多,当然功能也比较简单,如果只需要简单的订阅以及发布,可以考虑使用它。 **订阅操作** 命令为:subscribe [channel] [channel] ..,如【代码1】所示,即成功订阅频道[redis.blog]。 **发布操作** 命令为publish [channel] [message],
4574 0
|
12月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
7月前
|
缓存 负载均衡 监控
135_负载均衡:Redis缓存 - 提高缓存命中率的配置与最佳实践
在现代大型语言模型(LLM)部署架构中,缓存系统扮演着至关重要的角色。随着LLM应用规模的不断扩大和用户需求的持续增长,如何构建高效、可靠的缓存架构成为系统性能优化的核心挑战。Redis作为业界领先的内存数据库,因其高性能、丰富的数据结构和灵活的配置选项,已成为LLM部署中首选的缓存解决方案。
734 25