趋于成熟的队列:Stream
我们来看 Stream 是如何解决上面这些问题的。
我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?
首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:
XADD:发布消息
XREAD:读取消息
生产者发布 2 条消息:
// *表示让Redis自动生成消息ID 127.0.0.1:6379> XADD queue * name zhangsan "1618469123380-0" 127.0.0.1:6379> XADD queue * name lisi "1618469127777-0"
使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。
这个消息 ID 的格式是「时间戳-自增序号」。
消费者拉取消息:
// 从开头读取5条消息,0-0表示从开头读取 127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0 1) 1) "queue" 2) 1) 1) "1618469123380-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618469127777-0" 2) 1) "name" 2) "lisi"
如果想继续拉取消息,需要传入上一条消息的 ID:
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0 (nil)
没有消息,Redis 会返回 NULL。
以上就是 Stream 最简单的生产、消费。
这里不再重点介绍 Stream 命令的各种参数,我在例子中演示时,凡是大写的单词都是「固定」参数,凡是小写的单词,都是可以自己定义的,例如队列名、消息长度等等,下面的例子规则也是一样,为了方便你理解,这里有必要提醒一下。
下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?
1) Stream 是否支持「阻塞式」拉取消息?
可以的,在读取消息时,只需要增加 BLOCK 参数即可。
// BLOCK 0 表示阻塞等待,不设置超时时间 127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。
2) Stream 是否支持发布 / 订阅模式?
也没问题,Stream 通过以下命令完成发布订阅:
XGROUP:创建消费者组
XREADGROUP:在指定消费组下,开启消费者拉取消息
下面我们来看具体如何做?
首先,生产者依旧发布 2 条消息:
127.0.0.1:6379> XADD queue * name zhangsan "1618470740565-0" 127.0.0.1:6379> XADD queue * name lisi "1618470743793-0"
之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:
// 创建消费者组1,0-0表示从头拉取消息 127.0.0.1:6379> XGROUP CREATE queue group1 0-0 OK // 创建消费者组2,0-0表示从头拉取消息 127.0.0.1:6379> XGROUP CREATE queue group2 0-0 OK
消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。
第一个消费组开始消费:
// group1的consumer开始消费,>表示拉取最新数据 127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue > 1) 1) "queue" 2) 1) 1) "1618470740565-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618470743793-0" 2) 1) "name" 2) "lisi"
同样地,第二个消费组开始消费:
// group2的consumer开始消费,>表示拉取最新数据 127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue > 1) 1) "queue" 2) 1) 1) "1618470740565-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618470743793-0" 2) 1) "name" 2) "lisi"
我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。
这样一来,就达到了多组消费者「订阅」消费的目的。
3) 消息处理时异常,Stream 能否保证消息不丢失,重新消费?
除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。
当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。
// group1下的 1618472043089-0 消息已处理完成 127.0.0.1:6379> XACK queue group1 1618472043089-0
如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。
待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。
// 消费者重新上线,0-0表示重新拉取未ACK的消息 127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0 // 之前没消费成功的数据,依旧可以重新消费 1) 1) "queue" 2) 1) 1) "1618472043089-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618472045158-0" 2) 1) "name" 2) "lisi"
4) Stream 数据会写入到 RDB 和 AOF 做持久化吗?
Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。
我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。
5) 消息堆积时,Stream 是怎么处理的?
其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:
生产者限流:避免消费者处理不及时,导致持续积压
丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息
而 Redis 在实现 Stream 时,采用了第 2 个方案。
在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。
// 队列长度最大10000 127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan "1618473015018-0"
当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。
这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。
除了以上介绍到的命令,Stream 还支持查看消息长度(XLEN)、查看消费者状态(XINFO)等命令,使用也比较简单,你可以查询官方文档了解一下,这里就不过多介绍了。
好了,通过以上介绍,我们可以看到,Redis 的 Stream 几乎覆盖到了消息队列的各种场景,是不是觉得很完美?
既然它的功能这么强大,这是不是意味着,Redis 真的可以作为专业的消息队列中间件来使用呢?
但是还「差一点」,就算 Redis 能做到以上这些,也只是「趋近于」专业的消息队列。
原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。
到这里,就不得不把 Redis 与专业的队列中间件做对比了。
下面我们就来看一下,Redis 在作队列时,到底还有哪些欠缺?