Redis Stream消息队列之基本语法与使用方式

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。

前言

本文的主角是Redis Stream,它是Redis5.0版本新增加的数据结构,主要用于消息队列,提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失,功能颇为强大。其实,Redis本身是有一个Redis发布订阅来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis宕机等,消息就会被丢弃。所以,Redis的发布订阅像是Vue2的Bus或Vue3的Mitt,属于后端版的事件总线。此外,Redis本身的List和Sorted Set也可以实现,但是也有各自的缺点,如List没有消息多播功能,没有ACK机制,无法重复消费等,Sorted Set不支持阻塞式获取消息、不允许重复消费、不支持分组。相比之下,Redis Stream明显胜出。

一、消息队列相关命令

1.XADD - 添加消息到末尾

(1)语法格式:

XADD key ID field value

(2)参数:

  • key:队列名称,如果不存在就创建
  • ID:消息ID,我们使用*表示由redis生成,可以自定义,但是要自己保证递增性。
  • field value:记录。

(3)示例:

127.0.0.1:6379[15]> XADD MQ * name Vegeta sex male age 18
"1703235642574-0"
127.0.0.1:6379[15]> XADD MQ * name Bulma sex female age 18
"1703235648454-0"
127.0.0.1:6379[15]>

2.XLEN - 获取流包含的元素数量,即消息长度

(1)语法格式:

XLEN key

(2)参数:

  • key:队列名称。

(3)示例:

127.0.0.1:6379[15]> XLEN MQ
(integer) 2

3.XRANGE - 获取消息列表,会自动过滤已经删除的消息

(1)语法格式:

XRANGE key start end [COUNT count]

(2)参数:

  • key:队列名
  • start:开始值,- 表示最小值
  • end:结束值,+ 表示最大值
  • count:数量

(3)示例:

127.0.0.1:6379[15]> XRANGE MQ - +
1) 1) "1703235642574-0"
   2) 1) "name"
      2) "Vegeta"
      3) "sex"
      4) "male"
      5) "age"
      6) "18"
2) 1) "1703235648454-0"
   2) 1) "name"
      2) "Bulma"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"

4.XREVRANGE - 反向获取消息列表,ID从大到小

(1)语法格式:

XREVRANGE key end start [COUNT count]

(2)参数:

  • key:队列名
  • end:结束值,+ 表示最大值
  • start:开始值,- 表示最小值
  • count:数量

(3)示例:

127.0.0.1:6379[15]> XREVRANGE MQ + - COUNT 2
1) 1) "1703235648454-0"
   2) 1) "name"
      2) "Bulma"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"
2) 1) "1703235642574-0"
   2) 1) "name"
      2) "Vegeta"
      3) "sex"
      4) "male"
      5) "age"
      6) "18"

5.XDEL - 删除消息

(1)语法格式:

XDEL key ID [ID ...]

(2)参数:

  • key:队列名称
  • ID:消息 ID

(3)示例:

127.0.0.1:6379[15]> XADD MQ * name Kakarotto sex male age 18
"1703238230846-0"
127.0.0.1:6379[15]> XADD MQ * name Android18 sex female age 18
"1703238306386-0"
127.0.0.1:6379[15]> XDEL MQ 1703238230846-0
(integer) 1
127.0.0.1:6379[15]> XRANGE MQ - +
1) 1) "1703235642574-0"
   2) 1) "name"
      2) "Vegeta"
      3) "sex"
      4) "male"
      5) "age"
      6) "18"
2) 1) "1703235648454-0"
   2) 1) "name"
      2) "Bulma"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"
3) 1) "1703238306386-0"
   2) 1) "name"
      2) "Android18"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"

6.XTRIM - 对流进行修剪,限制长度

(1)语法格式:

XTRIM key MAXLEN [~] count

(2)参数:

  • key:队列名称
  • MAXLEN:Stream中最大消息数量(即保留的消息数量)
  • [~]:若使用了 ~ 符号,则表示限制的是消息的大小而非数量。
  • count:需要删除的消息数量

(3)示例:

# 限制MQ最多1条消息,其余删除
127.0.0.1:6379[15]> XTRIM MQ MAXLEN 1
(integer) 2
127.0.0.1:6379[15]> XRANGE MQ - +
1) 1) "1703238306386-0"
   2) 1) "name"
      2) "Android18"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"

7.XREAD - 以阻塞或非阻塞方式读取一个或多个队列的消息

(1)语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

(2)参数:

  • count:数量,默认值为10
  • milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式
  • key:队列名
  • id:指定读取的起始位置,可以是特定的消息ID,也可以是"$"表示最新的消息,或者是"0"表示从最早的消息开始读取。

(3)示例:

127.0.0.1:6379[15]> XADD MQ * name Vegeta sex male age 18
"1703297838206-0"
127.0.0.1:6379[15]> XADD MQ * name Bulma sex female age 18
"1703297844215-0"
# 读取MQ最早的默认条消息
127.0.0.1:6379[15]> XREAD STREAMS MQ 0
1) 1) "MQ"
   2) 1) 1) "1703238306386-0"
         2) 1) "name"
            2) "Android18"
            3) "sex"
            4) "female"
            5) "age"
            6) "18"
      2) 1) "1703297838206-0"
         2) 1) "name"
            2) "Vegeta"
            3) "sex"
            4) "male"
            5) "age"
            6) "18"
      3) 1) "1703297844215-0"
         2) 1) "name"
            2) "Bulma"
            3) "sex"
            4) "female"
            5) "age"
            6) "18"

# 读取MQ第二条消息,需指定第二条消息的ID
127.0.0.1:6379[15]> XREAD STREAMS MQ 1703297838206-0
1) 1) "MQ"
   2) 1) 1) "1703297844215-0"
         2) 1) "name"
            2) "Bulma"
            3) "sex"
            4) "female"
            5) "age"
            6) "18"

# 读取MQ最新的一条消息,需开启阻塞,阻塞时长为10s。如果10s内未读取到消息则退出阻塞。
【客户端A】127.0.0.1:6379[15]> XREAD BLOCK 100000 STREAMS MQ $
1) 1) "MQ"
   2) 1) 1) "1703300894359-0"
         2) 1) "name"
            2) "Ranchi"
            3) "sex"
            4) "male"
            5) "age"
            6) "18"
(2.04s)
【客户端A】127.0.0.1:6379[15]>

# 另开一个终端向MQ队列中写入一条消息,阻塞读的终端就能接收到消息。
root@帅龍之龍:~# redis-cli -h 127.0.0.1 -p 6379 -a 123456
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
【客户端B】127.0.0.1:6379> select 15
OK
【客户端B】127.0.0.1:6379[15]> XADD MQ * name Ranchi sex male age 18
"1703300894359-0"
【客户端B】127.0.0.1:6379[15]>

(4)注意:XREAD存在消息漏读的风险,当正在处理一条消息时,又有多条消息到达,此时读取的是最新那条!

二、消费者组相关命令

消费者组将多个消费者划分到一个组中,监听同一个队列,具有消息分流、消息标示、消息确认的特点。
·消息分流:分流给组内的不同消费者,不会重复消费,反而加快消费
·消息标示:消费者组会记录最后一个被处理的消息,确保每一个消息都会被消费
·消息确认:消费者获取消息后,消息处于pending状态,然后将其存入pending-list列表,当处理完成后,通过XACK确认消息,将消息标记为已处理,然后从pending-list被移除

1.XGROUP CREATE - 创建消费者组

(1)语法格式:

XGROUP CREATE key group id|$

(2)参数:

  • key:队列名称,如果不存在就创建起始ID
  • group:消费者组名
  • id:起始ID,$代表队列中最后一条消息,0代表队列中第一条消息

(3)示例:

# 创建一个从队列第一条消息开始消费的消费者组
127.0.0.1:6379[15]> XGROUP CREATE MQ mqGroupA 0
OK
# 创建一个从队列最后一条消息开始消费的消费者组
127.0.0.1:6379[15]> XGROUP CREATE MQ mqGroupB $
OK

2.XGROUP CREATECONSUMER - 在指定的消费者组中添加消费者

(1)语法格式:

XGROUP CREATECONSUMER key group consumer

(2)参数:

  • key:队列名称,如果不存在就创建
  • group:消费者组名
  • consumer:消费者名

(3)示例:

127.0.0.1:6379[15]> XGROUP CREATECONSUMER MQ mqGroupA consumer1
(integer) 1
127.0.0.1:6379[15]> XGROUP CREATECONSUMER MQ mqGroupA consumer2
(integer) 1
127.0.0.1:6379[15]> XGROUP CREATECONSUMER MQ mqGroupA consumer3
(integer) 1

3.XINFO STREAM - 打印流信息

(1)语法格式:

XINFO STREAM key

(2)参数:

  • key:队列名称

(3)示例:

127.0.0.1:6379[15]> XINFO STREAM MQ
 1) "length"
 2) (integer) 5
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1703300894359-0"
 9) "max-deleted-entry-id"
10) "1703238230846-0"
11) "entries-added"
12) (integer) 8
13) "recorded-first-entry-id"
14) "1703238306386-0"
15) "groups"
16) (integer) 2
17) "first-entry"
18) 1) "1703238306386-0"
    2) 1) "name"
       2) "Android18"
       3) "sex"
       4) "female"
       5) "age"
       6) "18"
19) "last-entry"
20) 1) "1703300894359-0"
    2) 1) "name"
       2) "Ranchi1"
       3) "sex"
       4) "male"
       5) "age"
       6) "18"

4.XINFO GROUPS - 打印消费者组的信息

(1)语法格式:

XINFO GROUPS key

(2)参数:

  • key:队列名称

(3)示例:

127.0.0.1:6379[15]> XINFO GROUPS MQ
1)  1) "name"
    2) "mqGroupA"
    3) "consumers"
    4) (integer) 3
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "0-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 5
2)  1) "name"
    2) "mqGroupB"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1703300894359-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0

5.XGROUP DELCONSUMER - 在指定的消费者组中删除消费者

(1)语法格式:

XGROUP DELCONSUMER key group consumer

(2)参数:

  • key:队列名称,如果不存在就创建
  • group:消费者组名
  • consumer:消费者名

(3)示例:

127.0.0.1:6379[15]> XGROUP DELCONSUMER MQ mqGroupA consumer3
(integer) 0

6.XGROUP DESTROY - 删除指定的消费者组

(1)语法格式:

XGROUP DESTROY key group

(2)参数:

  • key:队列名称,如果不存在就创建
  • group:消费者组名

(3)示例:

127.0.0.1:6379[15]> XGROUP DESTROY MQ mqGroupB
(integer) 1

7.XGROUP SETID - 为消费者组设置新的最后递送消息ID

(1)语法格式:

XGROUP SETID key group id|$

(2)参数:

  • key:队列名称,如果不存在就创建
  • group:消费者组名
  • id:起始ID,$代表队列中最后一条消息,0代表队列中第一条消息

(3)示例:

127.0.0.1:6379[15]> XGROUP CREATE MQ mqGroupB $
OK
127.0.0.1:6379[15]> XGROUP SETID MQ mqGroupB $
OK

8.XREADGROUP GROUP - 读取消费者组中的消息

(1)语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

(2)参数:

  • group:消费者组名
  • consumer:消费者名
  • count:读取数量
  • milliseconds:阻塞毫秒数
  • key:队列名
  • id:起始ID,>代表从下一条未消费的消息开始,0代表从pending-list中第一条消息开始,其它根据指定id从pending-list中获取已消费但未确认的消息开始

(3)示例:

# 指定消费者组的消费者去读取下一条未消费的消息
127.0.0.1:6379[15]> XREADGROUP GROUP mqGroupA consumer1 COUNT 1 STREAMS MQ >
1) 1) "MQ"
   2) 1) 1) "1703238306386-0"
         2) 1) "name"
            2) "Android18"
            3) "sex"
            4) "female"
            5) "age"
            6) "18"
127.0.0.1:6379[15]>
127.0.0.1:6379[15]> XREADGROUP GROUP mqGroupA consumer2 COUNT 1 STREAMS MQ >
1) 1) "MQ"
   2) 1) 1) "1703297838206-0"
         2) 1) "name"
            2) "Vegeta"
            3) "sex"
            4) "male"
            5) "age"
            6) "18"
127.0.0.1:6379[15]>

(4)注意:若某个消费者,消费了某条消息,但是并没有处理成功时(如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了!

9.XPENDING - 显示待处理消息的相关信息

(1)语法格式:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

(2)参数:

  • key:队列名
  • group:消费者组名
  • start:开始值,-表示最小值
  • end:结束值,+表示最大值
  • count:数量

(3)示例:

127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 2 # 已读取但未处理的消息数
2) "1703238306386-0" # 起始消息ID
3) "1703297838206-0" # 结束消息ID
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
127.0.0.1:6379[15]> XPENDING MQ mqGroupB
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

10.XACK - 将消息标记为"已处理"

(1)语法格式:

XACK key group id [id ...]

(2)参数:

  • key:队列名
  • group:消费者组名
  • id:消息ID

(3)示例:

127.0.0.1:6379[15]> XACK MQ mqGroupA 1703238306386-0
(integer) 1
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 1
2) "1703297838206-0"
3) "1703297838206-0"
4) 1) 1) "consumer2"
      2) "1"
127.0.0.1:6379[15]>

11.XCLAIM - 转移消息的归属权

(1)语法格式:

XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID] [LASTID lastid]

(2)参数:

  • key:队列名
  • group:消费者组名
  • consumer:消费者名
  • min-idle-time:从被读取到未处理的时间
  • id:消息ID

(3)示例:

# 在指定的消费者组中,将cosumer2已读取5分钟(300秒,300000毫秒),但未处理的`1703297838206-0`消息转移给consumer1
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 1
2) "1703297838206-0"
3) "1703297838206-0"
4) 1) 1) "consumer2"
      2) "1"
127.0.0.1:6379[15]> XCLAIM MQ mqGroupA consumer1 3600000 1703297838206-0
(empty array) # 转移不成功
127.0.0.1:6379[15]> XCLAIM MQ mqGroupA consumer1 300000 1703297838206-0
1) 1) "1703297838206-0"
   2) 1) "name"
      2) "Vegeta"
      3) "sex"
      4) "male"
      5) "age"
      6) "18"
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 1
2) "1703297838206-0"
3) "1703297838206-0"
4) 1) 1) "consumer1"
      2) "1"
127.0.0.1:6379[15]> XPENDING MQ mqGroupA - + 10
1) 1) "1703297838206-0"
   2) "consumer1"
   3) (integer) 1171095 # IDLE被重置了
   4) (integer) 2 # 读取次数被+1

(4)说明:某个消费者读取了消息但没有处理,这时消费者宕机或重启等就会导致该消息失踪。那么就需要该消息转移给其他的消费者处理,就是消息转移。转移除了要指定ID外,还需要指定min-idle-time最小空闲时间,该值要小于消息从被读取到未处理的时间。

三、消息队列的帮助命令

(1)语法格式:

HELP XXX

(2)参数:

  • XXX:命令关键字

(3)示例:

127.0.0.1:6379[15]> help XADD

  XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
  summary: Appends a new message to a stream. Creates the key if it doesn't exist.
  since: 5.0.0
  group: stream

127.0.0.1:6379[15]> HELP XGROUP

  XGROUP (null)
  summary: A container for consumer groups commands.
  since: 5.0.0
  group: stream

  XGROUP CREATE key group id|$ [MKSTREAM] [ENTRIESREAD entries-read]
  summary: Creates a consumer group.
  since: 5.0.0
  group: stream

  XGROUP CREATECONSUMER key group consumer
  summary: Creates a consumer in a consumer group.
  since: 6.2.0
  group: stream

  XGROUP DELCONSUMER key group consumer
  summary: Deletes a consumer from a consumer group.
  since: 5.0.0
  group: stream

  XGROUP DESTROY key group
  summary: Destroys a consumer group.
  since: 5.0.0
  group: stream

  XGROUP HELP (null)
  summary: Returns helpful text about the different subcommands.
  since: 5.0.0
  group: stream

  XGROUP SETID key group id|$ [ENTRIESREAD entries-read]
  summary: Sets the last-delivered ID of a consumer group.
  since: 5.0.0
  group: stream

127.0.0.1:6379[15]>
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
13天前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
50 6
|
1月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
84 20
剖析 Redis List 消息队列的三种消费线程模型
|
14天前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
20 2
|
14天前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
29 0
|
27天前
|
消息中间件 NoSQL 中间件
19)消息队列的终极解决方案 Stream
19)消息队列的终极解决方案 Stream
37 0
|
17天前
|
存储 缓存 NoSQL
数据的存储--Redis缓存存储(一)
数据的存储--Redis缓存存储(一)
53 1
|
17天前
|
存储 缓存 NoSQL
数据的存储--Redis缓存存储(二)
数据的存储--Redis缓存存储(二)
33 2
数据的存储--Redis缓存存储(二)
|
1月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
18天前
|
缓存 NoSQL 关系型数据库
redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿
本文深入探讨了Redis缓存的相关知识,包括缓存的概念、使用场景、可能出现的问题(缓存预热、缓存穿透、缓存雪崩、缓存击穿)及其解决方案。
90 0
redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿