玩转redis-简单消息队列

简介:

使用go语言基于redis写了一个简单的消息队列
源码地址
使用demo

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

添加数据和获取数据的操作也是非常简单的
LPUSH 从左边插入数据
RPUSH 大右边插入数据
LPOP 从左边取出一个数据
RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,
如果这期间有数据写入,则会读取并返回,没有数据则会返回空
在一个窗口1读取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一个窗口2写入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再开一个窗口3读取,第二次读取时,list是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个consumer消费掉?

当然不会,因为redis是单线程的,在从list取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用list实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用LPOP或者BLPOP都可以,
这里做了一个开关 optionsUseBLopp如果为true时会使用BLPOP

type consumer struct {
    once            sync.Once
    redisCmd        redis.Cmdable
    ctx             context.Context
    topicName       string
    handler         Handler
    rateLimitPeriod time.Duration
    options         ConsumerOptions
    _               struct{}
}

type ConsumerOptions struct {
    RateLimitPeriod time.Duration
    UseBLPop        bool
}

看一下创建consumer的代码,最后面的opts参数是可选的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
    consumer := &consumer{
        redisCmd:  redisCmd,
        ctx:       ctx,
        topicName: topicName,
    }
    for _, o := range opts {
        o(&consumer.options)
    }
    if consumer.options.RateLimitPeriod == 0 {
        consumer.options.RateLimitPeriod = time.Microsecond * 200
    }
    return consumer
}

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理
有一个小的interface调用者根据自己的逻辑去实现

type Handler interface {
    HandleMessage(msg *Message)
}

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get message.")
            ticker.Stop()
        }()
        for {
            select {
            case <-s.ctx.Done():
                log.Printf("context Done msg: %#v \n", s.ctx.Err())
                return
            case <-ticker.C:
                var revBody []byte
                var err error
                if !s.options.UseBLPop {
                    revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
                } else {
                    revs := s.redisCmd.BLPop(time.Second, s.topicName)
                    err = revs.Err()
                    revValues := revs.Val()
                    if len(revValues) >= 2 {
                        revBody = []byte(revValues[1])
                    }
                }
                if err == redis.Nil {
                    continue
                }
                if err != nil {
                    log.Printf("LPOP error: %#v \n", err)
                    continue
                }

                if len(revBody) == 0 {
                    continue
                }
                msg := &Message{}
                json.Unmarshal(revBody, msg)
                if s.handler != nil {
                    s.handler.HandleMessage(msg)
                }
            }
        }
    }()
}

Producer 的实现

Producer还是很简单的就是把数据推送到 reids

type Producer struct {
    redisCmd redis.Cmdable
    _        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
    return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
    msg := NewMessage("", body)
    sendData, _ := json.Marshal(msg)
    return p.redisCmd.RPush(topicName, string(sendData)).Err()
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
88 6
|
6月前
|
消息中间件 存储 负载均衡
Redis使用ZSET实现消息队列使用总结二
Redis使用ZSET实现消息队列使用总结二
83 0
|
4月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
113 20
剖析 Redis List 消息队列的三种消费线程模型
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
37 2
|
3月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
59 0
|
5月前
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
92 0
|
6月前
|
消息中间件 存储 NoSQL
Redis使用ZSET实现消息队列使用总结一
Redis使用ZSET实现消息队列使用总结一
151 0
|
22天前
|
存储 缓存 NoSQL
解决Redis缓存数据类型丢失问题
解决Redis缓存数据类型丢失问题
166 85
|
20天前
|
缓存 监控 NoSQL
Redis经典问题:缓存穿透
本文详细探讨了分布式系统和缓存应用中的经典问题——缓存穿透。缓存穿透是指用户请求的数据在缓存和数据库中都不存在,导致大量请求直接落到数据库上,可能引发数据库崩溃或性能下降。文章介绍了几种有效的解决方案,包括接口层增加校验、缓存空值、使用布隆过滤器、优化数据库查询以及加强监控报警机制。通过这些方法,可以有效缓解缓存穿透对系统的影响,提升系统的稳定性和性能。
|
2月前
|
缓存 NoSQL 关系型数据库
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
本文详解缓存雪崩、缓存穿透、缓存并发及缓存预热等问题,提供高可用解决方案,帮助你在大厂面试和实际工作中应对这些常见并发场景。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题