开发者社区> lpxxn> 正文

玩转redis-简单消息队列

简介:
+关注继续查看

使用go语言基于redis写了一个简单的消息队列
源码地址
使用demo

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

添加数据和获取数据的操作也是非常简单的
LPUSH 从左边插入数据
RPUSH 大右边插入数据
LPOP 从左边取出一个数据
RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,
如果这期间有数据写入,则会读取并返回,没有数据则会返回空
在一个窗口1读取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一个窗口2写入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再开一个窗口3读取,第二次读取时,list是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个consumer消费掉?

当然不会,因为redis是单线程的,在从list取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用list实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用LPOP或者BLPOP都可以,
这里做了一个开关 optionsUseBLopp如果为true时会使用BLPOP

type consumer struct {
    once            sync.Once
    redisCmd        redis.Cmdable
    ctx             context.Context
    topicName       string
    handler         Handler
    rateLimitPeriod time.Duration
    options         ConsumerOptions
    _               struct{}
}

type ConsumerOptions struct {
    RateLimitPeriod time.Duration
    UseBLPop        bool
}

看一下创建consumer的代码,最后面的opts参数是可选的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
    consumer := &consumer{
        redisCmd:  redisCmd,
        ctx:       ctx,
        topicName: topicName,
    }
    for _, o := range opts {
        o(&consumer.options)
    }
    if consumer.options.RateLimitPeriod == 0 {
        consumer.options.RateLimitPeriod = time.Microsecond * 200
    }
    return consumer
}

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理
有一个小的interface调用者根据自己的逻辑去实现

type Handler interface {
    HandleMessage(msg *Message)
}

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get message.")
            ticker.Stop()
        }()
        for {
            select {
            case <-s.ctx.Done():
                log.Printf("context Done msg: %#v \n", s.ctx.Err())
                return
            case <-ticker.C:
                var revBody []byte
                var err error
                if !s.options.UseBLPop {
                    revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
                } else {
                    revs := s.redisCmd.BLPop(time.Second, s.topicName)
                    err = revs.Err()
                    revValues := revs.Val()
                    if len(revValues) >= 2 {
                        revBody = []byte(revValues[1])
                    }
                }
                if err == redis.Nil {
                    continue
                }
                if err != nil {
                    log.Printf("LPOP error: %#v \n", err)
                    continue
                }

                if len(revBody) == 0 {
                    continue
                }
                msg := &Message{}
                json.Unmarshal(revBody, msg)
                if s.handler != nil {
                    s.handler.HandleMessage(msg)
                }
            }
        }
    }()
}

Producer 的实现

Producer还是很简单的就是把数据推送到 reids

type Producer struct {
    redisCmd redis.Cmdable
    _        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
    return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
    msg := NewMessage("", body)
    sendData, _ := json.Marshal(msg)
    return p.redisCmd.RPush(topicName, string(sendData)).Err()
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
redis灵魂拷问:如何使用stream实现消息队列
redis灵魂拷问:如何使用stream实现消息队列
210 0
基于Redis实现消息队列
基于Redis实现消息队列
178 0
【Redis】浅尝Redis Stream做消息队列
SpringBoot整合Redis5.0新特性Redis Stream
404 0
SpringBoot整合Redis实现消息队列
SpringBoot整合Redis实现消息队列
192 0
别再用 Redis List 实现消息队列了,Stream 专为队列而生
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。
241 0
Redis 竟然能用 List 实现消息队列
今天,码哥结合消息队列的特点一步步带大家分析使用 Redis 的 List 作为消息队列的实现原理,并分享如何把 SpringBoot 与 Redission 整合运用到项目中。
294 0
Redis和消息队列使用实战
 消息队列是在乐视这边非常普遍使用的技术。在我们部门内部,不同的项目使用的消息队列实现也不一样。下面是支付系统的流转图
104 0
AspNetCore结合Redis实践消息队列
这是年中首发在博客园上的文章,个人觉得是AspNetCore结合Redis做的一次比较优秀的消息队列重构,其中对于点对点/发布-订阅的思路应该也是面试必考题。
235 0
Redis消息队列发展历程
Redis是目前最受欢迎的kv类数据库,当然它的功能越来越多,早已不限定在kv场景,消息队列就是Redis中一个重要的功能。Redis从2010年发布1.0版本就具备一个消息队列的雏形,随着10多年的迭代,其消息队列的功能也越来越完善,作为一个全内存的消息队列,适合应用与要求高吞吐、低延时的场景。本文将来盘一下Redis消息队列功能的发展历程,历史版本有哪些不足,后续版本是如何来解决这些问题的。
1154 0
Redis消息队列发展历程
Redis消息队列功能一直在发展和完善,从list、pubsub、stream到Tair持久内存版stream,这篇文章盘点Redis消息队列不同实现的优缺点以及Tair持久内存版对消息队列功能的改进和未来发展方向。
741 0
+关注
lpxxn
山外青山,楼外楼。
文章
问答
文章排行榜
最热
最新
相关电子书
更多
高并发分布式缓存Redis6.0
立即下载
阿里云Redis服务助力游戏行业发展--王义成
立即下载
国内开发者与Redis开源社区的发展
立即下载