把Redis当作队列来用,真的合适吗?中

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 把Redis当作队列来用,真的合适吗?中

趋于成熟的队列:Stream

我们来看 Stream 是如何解决上面这些问题的。

我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?

首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:

XADD:发布消息

XREAD:读取消息

生产者发布 2 条消息:

// *表示让Redis自动生成消息ID
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"

使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。

这个消息 ID 的格式是「时间戳-自增序号」。

消费者拉取消息:

// 从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
   2) 1) 1) "1618469123380-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618469127777-0"
         2) 1) "name"
            2) "lisi"

如果想继续拉取消息,需要传入上一条消息的 ID:

127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)

没有消息,Redis 会返回 NULL。

e252df8cb9e029a8730461def4b44a4.png

以上就是 Stream 最简单的生产、消费。

这里不再重点介绍 Stream 命令的各种参数,我在例子中演示时,凡是大写的单词都是「固定」参数,凡是小写的单词,都是可以自己定义的,例如队列名、消息长度等等,下面的例子规则也是一样,为了方便你理解,这里有必要提醒一下。

下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?

1) Stream 是否支持「阻塞式」拉取消息?

可以的,在读取消息时,只需要增加 BLOCK 参数即可。

// BLOCK 0 表示阻塞等待,不设置超时时间
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0

这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。

2) Stream 是否支持发布 / 订阅模式?

也没问题,Stream 通过以下命令完成发布订阅:

XGROUP:创建消费者组

XREADGROUP:在指定消费组下,开启消费者拉取消息

下面我们来看具体如何做?

首先,生产者依旧发布 2 条消息:

127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"

之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

// 创建消费者组1,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
// 创建消费者组2,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK

消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。

第一个消费组开始消费:

// group1的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

同样地,第二个消费组开始消费:

// group2的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。

这样一来,就达到了多组消费者「订阅」消费的目的。

1d97b1b7a46341fc4f4409adb9411ec.png

3) 消息处理时异常,Stream 能否保证消息不丢失,重新消费?

除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。

当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。

// group1下的 1618472043089-0 消息已处理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0

115cec0b34f28b64415ea79a30865b9.png

如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。

待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

// 消费者重新上线,0-0表示重新拉取未ACK的消息
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
// 之前没消费成功的数据,依旧可以重新消费
1) 1) "queue"
   2) 1) 1) "1618472043089-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618472045158-0"
         2) 1) "name"
            2) "lisi"

4) Stream 数据会写入到 RDB 和 AOF 做持久化吗?

Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

5) 消息堆积时,Stream 是怎么处理的?

其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

生产者限流:避免消费者处理不及时,导致持续积压

丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

而 Redis 在实现 Stream 时,采用了第 2 个方案。

在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

// 队列长度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。

这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

除了以上介绍到的命令,Stream 还支持查看消息长度(XLEN)、查看消费者状态(XINFO)等命令,使用也比较简单,你可以查询官方文档了解一下,这里就不过多介绍了。

好了,通过以上介绍,我们可以看到,Redis 的 Stream 几乎覆盖到了消息队列的各种场景,是不是觉得很完美?

既然它的功能这么强大,这是不是意味着,Redis 真的可以作为专业的消息队列中间件来使用呢?

但是还「差一点」,就算 Redis 能做到以上这些,也只是「趋近于」专业的消息队列。

原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。

到这里,就不得不把 Redis 与专业的队列中间件做对比了。

下面我们就来看一下,Redis 在作队列时,到底还有哪些欠缺?

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
7月前
|
消息中间件 NoSQL Java
别再用 Redis List 实现消息队列了,Stream 专为队列而生
别再用 Redis List 实现消息队列了,Stream 专为队列而生
148 0
|
4月前
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
244 3
|
1月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
46 6
|
2月前
|
消息中间件 存储 NoSQL
如何用Redis实现延迟队列?
综上所述,通过Redis的有序集合和一些基本命令,我们可以轻松地构建出功能完善的延迟队列系统。根据具体需求,可以进一步优化和扩展,以满足高性能和高可靠性的业务需求。
59 1
|
3月前
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
|
7月前
|
存储 NoSQL API
【小小思考】Redis实现去重任务队列
【2月更文挑战第1天】思考一下如何用Redis实现去重的任务队列,主要有List 、List + Set/Hash/Bloom Filter、ZSet、Lua和开源库等方式。
267 1
|
NoSQL Redis
redis队列
redis队列
47 0
|
消息中间件 NoSQL Java
Redis实现延迟队列,我研究了两种方案,发现并不简单
前段时间有个小项目需要使用延迟任务,谈到延迟任务,我脑子第一时间一闪而过的就是使用消息队列来做,比如RabbitMQ的死信队列又或者RocketMQ的延迟队列,但是奈何这是一个小项目,并没有引入MQ,我也不太想因为一个延迟任务就引入MQ,增加系统复杂度,所以这个方案直接就被pass了。
|
NoSQL Go Redis
Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库
Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库
513 0
|
NoSQL Go Redis
Redis与异步队列
使用Redis可以很方便地实现异步队列。
102 0