Redis进阶-Stream多播的可持久化的消息队列

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: Redis进阶-Stream多播的可持久化的消息队列


Pre

Redis-13Redis发布订阅 中提到了PubSub的不足之处 。

PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。

如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。

如果 Redis 停机重启,PubSub 的消息是不会持久化的,毕竟 Redis 宕机就相当于一个消费者都没有,所有的消息直接被丢弃。

正是因为 PubSub 有这些缺点,它几乎找不到合适的应用场景。Redis5.0 新增了 Stream 数据结构,这个功能给 Redis 带来了持久化消息队列,从此 PubSub 可以消失了。


Stream简介

Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列。 Redis Stream 借鉴了 Kafka 的设计。


Stream特性

  • Redis Stream 的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容
  • 消息是持久化的,Redis 重启后,内容还在
  • 每个 Stream 都有唯一的名称,它就是 Redis 的 key,首次使用 xadd 指令追加消息时自动创建
  • 每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id 在 Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了
  • 每个消费组都有一个 Stream内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create 进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id 变量。
  • 每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响,即同一份Stream 内部的消息会被每个消费组都消费到
  • 同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。
  • 消费者 (Consumer) 内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL ( Pending Entries List),这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

消息 ID

消息 ID 的形式是 timestampInMillis-sequence,例如 1587877430819-3,它表示当前的消息在毫米时间戳 1587877430819时产生,并且是该毫秒内产生的第 3条消息。

消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。


消息内容

消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。


命令预览

Redis Version _ 5.0.3

  • xadd 追加消息
  • xdel 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度
  • xrange 获取消息列表,会自动过滤已经删除的消息
  • xlen 消息长度
  • del 删除 Stream
  • 。。。。
#  * 号表示服务器自动生成 ID,后面顺序跟着一堆 key/value
127.0.0.1:6379> XADD artisankey * name fonia sex female
"1587877430819-0"   --->  生成的消息 ID
127.0.0.1:6379> XADD artisankey * name jeff sex male
"1587877454849-0"
 # XLEN   消息长度
127.0.0.1:6379> XLEN artisankey
(integer) 2
 # -表示最小值 , + 表示最大值
127.0.0.1:6379> xrange artisankey - +    
1) 1) "1587877430819-0"
   2) 1) "name"
      2) "fonia"
      3) "sex"
      4) "female"
2) 1) "1587877454849-0"
   2) 1) "name"
      2) "jeff"
      3) "sex"
      4) "male"
# 指定最大消息 ID 的列表
127.0.0.1:6379> XRANGE artisankey - 1587877430819-0
1) 1) "1587877430819-0"
   2) 1) "name"
      2) "fonia"
      3) "sex"
      4) "female"
# 再加入一条数据,格式任意 多了个age字段
127.0.0.1:6379> XADD artisankey * name jeff sex male  age 20
"1587877808930-0"
127.0.0.1:6379> xrange artisankey - +
1) 1) "1587877430819-0"
   2) 1) "name"
      2) "fonia"
      3) "sex"
      4) "female"
2) 1) "1587877454849-0"
   2) 1) "name"
      2) "jeff"
      3) "sex"
      4) "male"
3) 1) "1587877808930-0"
   2) 1) "name"
      2) "jeff"
      3) "sex"
      4) "male"
      5) "age"
      6) "20"
# 删除掉刚才新加的这条数据
127.0.0.1:6379> XDEL artisankey 1587877808930-0
(integer) 1
# 长度变为2
127.0.0.1:6379> XLEN artisankey
(integer) 2
# 被删除的消息已经没了
127.0.0.1:6379> XRANGE artisankey - +
1) 1) "1587877430819-0"
   2) 1) "name"
      2) "fonia"
      3) "sex"
      4) "female"
2) 1) "1587877454849-0"
   2) 1) "name"
      2) "jeff"
      3) "sex"
      4) "male"
127.0.0.1:6379> 
# 删除整个 Stream
127.0.0.1:6379> DEL artisankey
(integer) 1
127.0.0.1:6379> XRANGE artisankey - +
(empty list or set)
127.0.0.1:6379> 

独立消费

我们可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。

Redis 设计了一个单独的消费指令 xread,可以将 Stream 当成普通的消息队列 (list) 来使用。

使用 xread 时,我们可以完全忽略消费组 (Consumer Group)的存在,就好比 Stream 就是一个普通的列表 (list)。

演示一下

#先通过xadd向artisan这个队列写入5条数据
127.0.0.1:6379> XADD artisan * name artisan1 age 25
"1587886601587-0"
127.0.0.1:6379> XADD artisan * name artisan2 age 26
"1587886610449-0"
127.0.0.1:6379> XADD artisan * name artisan3 age 27
"1587886617014-0"
127.0.0.1:6379> XADD artisan * name artisan4 age 28
"1587886622590-0"
127.0.0.1:6379> XADD artisan * name artisan6 age 25
"1587886631932-0"
127.0.0.1:6379> XLEN artisan
(integer) 5
127.0.0.1:6379> 
# 从 Stream 头部读取两条消息
127.0.0.1:6379> XREAD COUNT  2 STREAMS  artisan 0-0
1) 1) "artisan"
   2) 1) 1) "1587886601587-0"
         2) 1) "name"
            2) "artisan1"
            3) "age"
            4) "25"
      2) 1) "1587886610449-0"
         2) 1) "name"
            2) "artisan2"
            3) "age"
            4) "26"
127.0.0.1:6379> 
# 从 Stream 尾部读取一条消息,毫无疑问,这里不会返回任何消息
127.0.0.1:6379> XREAD COUNT 1 STREAMS artisan $
(nil)
127.0.0.1:6379> 
# 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来
127.0.0.1:6379> XREAD block 0 count 1 streams artisan $

# 重新打开一个窗口,在这个窗口往 Stream 里塞消息
127.0.0.1:6379> XADD artisan * name artisan7 age 27
"1587886999018-0"
127.0.0.1:6379> 

# 再切换到前面的窗口,我们可以看到阻塞解除了,返回了新的消息内容
# 而且还显示了一个等待时间,这里我们等待了 136.42s
127.0.0.1:6379> XREAD block 0 count 1 streams artisan $
1) 1) "artisan"
   2) 1) 1) "1587886999018-0"
         2) 1) "name"
            2) "artisan7"
            3) "age"
            4) "27"
(136.42s)
127.0.0.1:6379> 

  • 客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。
  • block 0 表示永远阻塞,直到消息到来,block 1000 表示阻塞 1s,如果 1s 内没有任何消息到来,就返回 nil
127.0.0.1:6379> XREAD count 1 block 1000 streams artisan $
(nil)
(1.03s)
127.0.0.1:6379> 

创建消费组

Stream 通过 xgroup create 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量

# 表示从头开始消费
127.0.0.1:6379> XGROUP create artisan artisanGroup 0-0
OK
127.0.0.1:6379> 
# $ 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
127.0.0.1:6379> XGROUP create artisan artisanGroup2 $
OK
127.0.0.1:6379> 
127.0.0.1:6379> XINFO stream artisan
 1) "length"
 2) (integer) 6  # 共 6 个消息
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 2   # 两个消费组
 9) "last-generated-id"
10) "1587886999018-0"
11) "first-entry"   # 第一个消息
12) 1) "1587886601587-0"
    2) 1) "name"
       2) "artisan1"
       3) "age"
       4) "25"
13) "last-entry" # 最后一个消息
14) 1) "1587886999018-0"
    2) 1) "name"
       2) "artisan7"
       3) "age"
       4) "27"
127.0.0.1:6379> 
# 获取 Stream 的消费组信息
127.0.0.1:6379> XINFO groups artisan
1) 1) "name"
   2) "artisanGroup"
   3) "consumers"
   4) (integer) 0  # 该消费组还没有消费者
   5) "pending"
   6) (integer) 0  # 该消费组没有正在处理的消息
   7) "last-delivered-id"
   8) "0-0"
2) 1) "name"
   2) "artisanGroup2"
   3) "consumers"
   4) (integer) 0  # 该消费组还没有消费者
   5) "pending"
   6) (integer) 0  # 该消费组没有正在处理的消息
   7) "last-delivered-id"
   8) "1587886999018-0"

消费

Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。

它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

先看下目前队列中的数据

127.0.0.1:6379> XRANGE artisan - +
1) 1) "1587886601587-0"
   2) 1) "name"
      2) "artisan1"
      3) "age"
      4) "25"
2) 1) "1587886610449-0"
   2) 1) "name"
      2) "artisan2"
      3) "age"
      4) "26"
3) 1) "1587886617014-0"
   2) 1) "name"
      2) "artisan3"
      3) "age"
      4) "27"
4) 1) "1587886622590-0"
   2) 1) "name"
      2) "artisan4"
      3) "age"
      4) "28"
5) 1) "1587886631932-0"
   2) 1) "name"
      2) "artisan6"
      3) "age"
      4) "25"
6) 1) "1587886999018-0"
   2) 1) "name"
      2) "artisan7"
      3) "age"
      4) "27"
127.0.0.1:6379> 
# > 号表示从当前消费组的 last_delivered_id 后面开始读
# 每当消费者读取一条消息,last_delivered_id 变量就会前进
127.0.0.1:6379> XREADGROUP group  artisanGroup artisanGroup2 count 1 streams artisan >
1) 1) "artisan"
   2) 1) 1) "1587886601587-0"
         2) 1) "name"
            2) "artisan1"
            3) "age"
            4) "25"
127.0.0.1:6379> 
127.0.0.1:6379> XREADGROUP group  artisanGroup artisanGroup2 count 1 streams artisan >
1) 1) "artisan"
   2) 1) 1) "1587886610449-0"
         2) 1) "name"
            2) "artisan2"
            3) "age"
            4) "26"
127.0.0.1:6379> XREADGROUP group  artisanGroup artisanGroup2 count 1 streams artisan >
1) 1) "artisan"
   2) 1) 1) "1587886617014-0"
         2) 1) "name"
            2) "artisan3"
            3) "age"
            4) "27"
127.0.0.1:6379> XREADGROUP group  artisanGroup artisanGroup2 count 1 streams artisan >
1) 1) "artisan"
   2) 1) 1) "1587886622590-0"
         2) 1) "name"
            2) "artisan4"
            3) "age"
            4) "28"
127.0.0.1:6379> XREADGROUP group  artisanGroup artisanGroup2 count 1 streams artisan >
1) 1) "artisan"
   2) 1) 1) "1587886631932-0"
         2) 1) "name"
            2) "artisan6"
            3) "age"
            4) "25"
127.0.0.1:6379> XREADGROUP group  artisanGroup artisanGroup2 count 1 streams artisan >
1) 1) "artisan"
   2) 1) 1) "1587886999018-0"
         2) 1) "name"
            2) "artisan7"
            3) "age"
            4) "27"
# 再继续读取,就没有新消息了
127.0.0.1:6379> XREADGROUP group  artisanGroup artisanGroup2 count 1 streams artisan >
(nil)
127.0.0.1:6379> 
# 那就阻塞等待吧
127.0.0.1:6379> XREADGROUP group  artisanGroup artisanGroup2 block 0  count 1 streams artisan >
# 开启另一个窗口,往里塞消息
127.0.0.1:6379> XADD artisan * name artisan8 age 28
"1587889553099-0"
127.0.0.1:6379> 

# 回到前一个窗口,发现阻塞解除,收到新消息了
127.0.0.1:6379> XREADGROUP group  artisanGroup artisanGroup2 block 0  count 1 streams artisan >
1) 1) "artisan"
   2) 1) 1) "1587889553099-0"
         2) 1) "name"
            2) "artisan8"
            3) "age"
            4) "28"
(37.63s)
127.0.0.1:6379> 

# 观察消费组信息
127.0.0.1:6379> XINFO groups artisan
1) 1) "name"
   2) "artisanGroup"
   3) "consumers"
   4) (integer) 1   # 一个消费者
   5) "pending"
   6) (integer) 7  # 共 7 条正在处理的信息还有没有 ack
   7) "last-delivered-id"
   8) "1587889553099-0"
2) 1) "name"
   2) "artisanGroup2"
   3) "consumers"
   4) (integer) 0  # 消费组 artisanGroup2没有任何变化,因为前面我们一直在操纵 artisanGroup
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1587886999018-0"
127.0.0.1:6379> 
# 如果同一个消费组有多个消费者,我们可以通过 xinfo consumers 指令观察每个消费者的状态
127.0.0.1:6379> XINFO consumers artisan artisanGroup
1) 1) "name"
   2) "artisanGroup2"
   3) "pending"
   4) (integer) 7   # 共 7 条待处理消息
   5) "idle"
   6) (integer) 179922  # 空闲了多长时间 ms 没有读取消息了
127.0.0.1:6379> XINFO consumers artisan artisanGroup2
(empty list or set)
127.0.0.1:6379> 
# 接下来我们 ack 一条消息
127.0.0.1:6379> XACK artisan artisanGroup 1587886601587-0  
(integer) 1
127.0.0.1:6379> XINFO consumers artisan artisanGroup
1) 1) "name"
   2) "artisanGroup2"
   3) "pending"
   4) (integer) 6  # 变成了 6 条待处理的消息
   5) "idle"
   6) (integer) 359659
127.0.0.1:6379> 
# 下面 ack 所有消息
127.0.0.1:6379> XACK artisan artisanGroup 1587886610449-0 1587886617014-0 1587886622590-0 1587886631932-0 1587886999018-0  1587889553099-0 
(integer) 6
127.0.0.1:6379> XINFO consumers artisan artisanGroup
1) 1) "name"
   2) "artisanGroup2"
   3) "pending"
   4) (integer) 0 # pel 空了
   5) "idle"
   6) (integer) 528547
127.0.0.1:6379> 

Stream 消息积压怎么处理

消息积累太多,Stream 的链表岂不是很长,内容会不会爆掉?xdel指令又不会删除消息,它只是给消息做了个标志位。

Redis考虑到了这一点,所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度

127.0.0.1:6379> xlen artisan
(integer) 7
127.0.0.1:6379> xadd artisan maxlen 3 * name artisan89 age 90
"1587890235506-0"
127.0.0.1:6379> XLEN artisan
(integer) 3
127.0.0.1:6379> 

我们看到 Stream 的长度被砍掉了,通过指定 maxlen,仅保留了 maxlen的长度数据。


消息如果忘记 ACK 会怎样?

Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大


PEL 如何避免消息丢失?

在客户端消费者读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了.

但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息ID 不能为参数>,而必须是任意有效的消息 ID,一般将参数设为 0-0,表示读取所有的PEL 消息以及自 last_delivered_id 之后的新消息。


Stream 的高可用

Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。不过鉴于 Redis 的指令复制是异步的,在 failover 发生时,Redis 可能会丢失极小部分数据,这点 Redis 的其它数据结构也是一样的。


分区 Partition

Redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream。

Kafka 是原生支持 Partition 的,但也是客户端做的。Kafka 的客户端存在 HashStrategy ,因为它也是通过客户端的 hash 算法来将不同的消息塞入不同分区

的。

另外,Kafka 还支持动态增加分区数量的能力,但是这种调整能力也是很蹩脚的,它不会把之前已经存在的内容进行 rehash,不会重新分区历史数据。这种简单的动态调整的能力Redis Stream 通过增加新的 Stream 就可以做到。


小结

Stream 的消费模型借鉴了 Kafka 的消费分组的概念,它弥补了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的话,得在客户端做,提供不同的 Stream 名称,对消息进行 hash 取模来选择往哪个 Stream 里塞。

数据结构 RadixTree

参考:radix tree,基数树

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
74 6
|
22天前
|
消息中间件 NoSQL Redis
Redis Stream
10月更文挑战第20天
26 2
|
2月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
96 20
剖析 Redis List 消息队列的三种消费线程模型
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
27 2
|
1月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
37 0
|
2月前
|
消息中间件 NoSQL 中间件
19)消息队列的终极解决方案 Stream
19)消息队列的终极解决方案 Stream
42 0
|
6月前
|
NoSQL Redis
03- Redis的数据持久化策略有哪些 ?
Redis的数据持久化包括两种策略:RDB(全量快照)和AOF(增量日志)。RDB在指定时间间隔将内存数据集保存到磁盘,而AOF记录所有写操作形成日志。从Redis 4.0开始,支持RDB和AOF的混合持久化,通过设置`aof-use-rdb-preamble yes`。
56 1
|
4月前
|
canal 缓存 NoSQL
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;先删除缓存还是先修改数据库,双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
|
5月前
|
存储 缓存 JSON
Redis-持久化-淘汰机制-IO策略
Redis-持久化-淘汰机制-IO策略
|
6月前
|
存储 NoSQL 关系型数据库
Redis持久化策略AOF、RDB详解及源码分析
Redis持久化策略AOF、RDB详解及源码分析