前言
本文的主角是Redis Stream,它是Redis5.0版本新增加的数据结构,主要用于消息队列,提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失,功能颇为强大。其实,Redis本身是有一个Redis发布订阅来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis宕机等,消息就会被丢弃。所以,Redis的发布订阅像是Vue2的Bus或Vue3的Mitt,属于后端版的事件总线。此外,Redis本身的List和Sorted Set也可以实现,但是也有各自的缺点,如List没有消息多播功能,没有ACK机制,无法重复消费等,Sorted Set不支持阻塞式获取消息、不允许重复消费、不支持分组。相比之下,Redis Stream明显胜出。
一、消息队列相关命令
1.XADD - 添加消息到末尾
(1)语法格式:
XADD key ID field value
(2)参数:
- key:队列名称,如果不存在就创建
- ID:消息ID,我们使用*表示由redis生成,可以自定义,但是要自己保证递增性。
- field value:记录。
(3)示例:
127.0.0.1:6379[15]> XADD MQ * name Vegeta sex male age 18
"1703235642574-0"
127.0.0.1:6379[15]> XADD MQ * name Bulma sex female age 18
"1703235648454-0"
127.0.0.1:6379[15]>
2.XLEN - 获取流包含的元素数量,即消息长度
(1)语法格式:
XLEN key
(2)参数:
- key:队列名称。
(3)示例:
127.0.0.1:6379[15]> XLEN MQ
(integer) 2
3.XRANGE - 获取消息列表,会自动过滤已经删除的消息
(1)语法格式:
XRANGE key start end [COUNT count]
(2)参数:
- key:队列名
- start:开始值,- 表示最小值
- end:结束值,+ 表示最大值
- count:数量
(3)示例:
127.0.0.1:6379[15]> XRANGE MQ - +
1) 1) "1703235642574-0"
2) 1) "name"
2) "Vegeta"
3) "sex"
4) "male"
5) "age"
6) "18"
2) 1) "1703235648454-0"
2) 1) "name"
2) "Bulma"
3) "sex"
4) "female"
5) "age"
6) "18"
4.XREVRANGE - 反向获取消息列表,ID从大到小
(1)语法格式:
XREVRANGE key end start [COUNT count]
(2)参数:
- key:队列名
- end:结束值,+ 表示最大值
- start:开始值,- 表示最小值
- count:数量
(3)示例:
127.0.0.1:6379[15]> XREVRANGE MQ + - COUNT 2
1) 1) "1703235648454-0"
2) 1) "name"
2) "Bulma"
3) "sex"
4) "female"
5) "age"
6) "18"
2) 1) "1703235642574-0"
2) 1) "name"
2) "Vegeta"
3) "sex"
4) "male"
5) "age"
6) "18"
5.XDEL - 删除消息
(1)语法格式:
XDEL key ID [ID ...]
(2)参数:
- key:队列名称
- ID:消息 ID
(3)示例:
127.0.0.1:6379[15]> XADD MQ * name Kakarotto sex male age 18
"1703238230846-0"
127.0.0.1:6379[15]> XADD MQ * name Android18 sex female age 18
"1703238306386-0"
127.0.0.1:6379[15]> XDEL MQ 1703238230846-0
(integer) 1
127.0.0.1:6379[15]> XRANGE MQ - +
1) 1) "1703235642574-0"
2) 1) "name"
2) "Vegeta"
3) "sex"
4) "male"
5) "age"
6) "18"
2) 1) "1703235648454-0"
2) 1) "name"
2) "Bulma"
3) "sex"
4) "female"
5) "age"
6) "18"
3) 1) "1703238306386-0"
2) 1) "name"
2) "Android18"
3) "sex"
4) "female"
5) "age"
6) "18"
6.XTRIM - 对流进行修剪,限制长度
(1)语法格式:
XTRIM key MAXLEN [~] count
(2)参数:
- key:队列名称
- MAXLEN:Stream中最大消息数量(即保留的消息数量)
- [~]:若使用了
~
符号,则表示限制的是消息的大小而非数量。 - count:需要删除的消息数量
(3)示例:
# 限制MQ最多1条消息,其余删除
127.0.0.1:6379[15]> XTRIM MQ MAXLEN 1
(integer) 2
127.0.0.1:6379[15]> XRANGE MQ - +
1) 1) "1703238306386-0"
2) 1) "name"
2) "Android18"
3) "sex"
4) "female"
5) "age"
6) "18"
7.XREAD - 以阻塞或非阻塞方式读取一个或多个队列的消息
(1)语法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
(2)参数:
- count:数量,默认值为10
- milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式
- key:队列名
- id:指定读取的起始位置,可以是特定的消息ID,也可以是"$"表示最新的消息,或者是"0"表示从最早的消息开始读取。
(3)示例:
127.0.0.1:6379[15]> XADD MQ * name Vegeta sex male age 18
"1703297838206-0"
127.0.0.1:6379[15]> XADD MQ * name Bulma sex female age 18
"1703297844215-0"
# 读取MQ最早的默认条消息
127.0.0.1:6379[15]> XREAD STREAMS MQ 0
1) 1) "MQ"
2) 1) 1) "1703238306386-0"
2) 1) "name"
2) "Android18"
3) "sex"
4) "female"
5) "age"
6) "18"
2) 1) "1703297838206-0"
2) 1) "name"
2) "Vegeta"
3) "sex"
4) "male"
5) "age"
6) "18"
3) 1) "1703297844215-0"
2) 1) "name"
2) "Bulma"
3) "sex"
4) "female"
5) "age"
6) "18"
# 读取MQ第二条消息,需指定第二条消息的ID
127.0.0.1:6379[15]> XREAD STREAMS MQ 1703297838206-0
1) 1) "MQ"
2) 1) 1) "1703297844215-0"
2) 1) "name"
2) "Bulma"
3) "sex"
4) "female"
5) "age"
6) "18"
# 读取MQ最新的一条消息,需开启阻塞,阻塞时长为10s。如果10s内未读取到消息则退出阻塞。
【客户端A】127.0.0.1:6379[15]> XREAD BLOCK 100000 STREAMS MQ $
1) 1) "MQ"
2) 1) 1) "1703300894359-0"
2) 1) "name"
2) "Ranchi"
3) "sex"
4) "male"
5) "age"
6) "18"
(2.04s)
【客户端A】127.0.0.1:6379[15]>
# 另开一个终端向MQ队列中写入一条消息,阻塞读的终端就能接收到消息。
root@帅龍之龍:~# redis-cli -h 127.0.0.1 -p 6379 -a 123456
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
【客户端B】127.0.0.1:6379> select 15
OK
【客户端B】127.0.0.1:6379[15]> XADD MQ * name Ranchi sex male age 18
"1703300894359-0"
【客户端B】127.0.0.1:6379[15]>
(4)注意:XREAD存在消息漏读的风险,当正在处理一条消息时,又有多条消息到达,此时读取的是最新那条!
二、消费者组相关命令
消费者组将多个消费者划分到一个组中,监听同一个队列,具有消息分流、消息标示、消息确认的特点。
·消息分流:分流给组内的不同消费者,不会重复消费,反而加快消费
·消息标示:消费者组会记录最后一个被处理的消息,确保每一个消息都会被消费
·消息确认:消费者获取消息后,消息处于pending状态,然后将其存入pending-list列表,当处理完成后,通过XACK确认消息,将消息标记为已处理,然后从pending-list被移除
1.XGROUP CREATE - 创建消费者组
(1)语法格式:
XGROUP CREATE key group id|$
(2)参数:
- key:队列名称,如果不存在就创建起始ID
- group:消费者组名
- id:起始ID,$代表队列中最后一条消息,0代表队列中第一条消息
(3)示例:
# 创建一个从队列第一条消息开始消费的消费者组
127.0.0.1:6379[15]> XGROUP CREATE MQ mqGroupA 0
OK
# 创建一个从队列最后一条消息开始消费的消费者组
127.0.0.1:6379[15]> XGROUP CREATE MQ mqGroupB $
OK
2.XGROUP CREATECONSUMER - 在指定的消费者组中添加消费者
(1)语法格式:
XGROUP CREATECONSUMER key group consumer
(2)参数:
- key:队列名称,如果不存在就创建
- group:消费者组名
- consumer:消费者名
(3)示例:
127.0.0.1:6379[15]> XGROUP CREATECONSUMER MQ mqGroupA consumer1
(integer) 1
127.0.0.1:6379[15]> XGROUP CREATECONSUMER MQ mqGroupA consumer2
(integer) 1
127.0.0.1:6379[15]> XGROUP CREATECONSUMER MQ mqGroupA consumer3
(integer) 1
3.XINFO STREAM - 打印流信息
(1)语法格式:
XINFO STREAM key
(2)参数:
- key:队列名称
(3)示例:
127.0.0.1:6379[15]> XINFO STREAM MQ
1) "length"
2) (integer) 5
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1703300894359-0"
9) "max-deleted-entry-id"
10) "1703238230846-0"
11) "entries-added"
12) (integer) 8
13) "recorded-first-entry-id"
14) "1703238306386-0"
15) "groups"
16) (integer) 2
17) "first-entry"
18) 1) "1703238306386-0"
2) 1) "name"
2) "Android18"
3) "sex"
4) "female"
5) "age"
6) "18"
19) "last-entry"
20) 1) "1703300894359-0"
2) 1) "name"
2) "Ranchi1"
3) "sex"
4) "male"
5) "age"
6) "18"
4.XINFO GROUPS - 打印消费者组的信息
(1)语法格式:
XINFO GROUPS key
(2)参数:
- key:队列名称
(3)示例:
127.0.0.1:6379[15]> XINFO GROUPS MQ
1) 1) "name"
2) "mqGroupA"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 5
2) 1) "name"
2) "mqGroupB"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1703300894359-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
5.XGROUP DELCONSUMER - 在指定的消费者组中删除消费者
(1)语法格式:
XGROUP DELCONSUMER key group consumer
(2)参数:
- key:队列名称,如果不存在就创建
- group:消费者组名
- consumer:消费者名
(3)示例:
127.0.0.1:6379[15]> XGROUP DELCONSUMER MQ mqGroupA consumer3
(integer) 0
6.XGROUP DESTROY - 删除指定的消费者组
(1)语法格式:
XGROUP DESTROY key group
(2)参数:
- key:队列名称,如果不存在就创建
- group:消费者组名
(3)示例:
127.0.0.1:6379[15]> XGROUP DESTROY MQ mqGroupB
(integer) 1
7.XGROUP SETID - 为消费者组设置新的最后递送消息ID
(1)语法格式:
XGROUP SETID key group id|$
(2)参数:
- key:队列名称,如果不存在就创建
- group:消费者组名
- id:起始ID,$代表队列中最后一条消息,0代表队列中第一条消息
(3)示例:
127.0.0.1:6379[15]> XGROUP CREATE MQ mqGroupB $
OK
127.0.0.1:6379[15]> XGROUP SETID MQ mqGroupB $
OK
8.XREADGROUP GROUP - 读取消费者组中的消息
(1)语法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
(2)参数:
- group:消费者组名
- consumer:消费者名
- count:读取数量
- milliseconds:阻塞毫秒数
- key:队列名
- id:起始ID,>代表从下一条未消费的消息开始,0代表从pending-list中第一条消息开始,其它根据指定id从pending-list中获取已消费但未确认的消息开始
(3)示例:
# 指定消费者组的消费者去读取下一条未消费的消息
127.0.0.1:6379[15]> XREADGROUP GROUP mqGroupA consumer1 COUNT 1 STREAMS MQ >
1) 1) "MQ"
2) 1) 1) "1703238306386-0"
2) 1) "name"
2) "Android18"
3) "sex"
4) "female"
5) "age"
6) "18"
127.0.0.1:6379[15]>
127.0.0.1:6379[15]> XREADGROUP GROUP mqGroupA consumer2 COUNT 1 STREAMS MQ >
1) 1) "MQ"
2) 1) 1) "1703297838206-0"
2) 1) "name"
2) "Vegeta"
3) "sex"
4) "male"
5) "age"
6) "18"
127.0.0.1:6379[15]>
(4)注意:若某个消费者,消费了某条消息,但是并没有处理成功时(如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了!
9.XPENDING - 显示待处理消息的相关信息
(1)语法格式:
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
(2)参数:
- key:队列名
- group:消费者组名
- start:开始值,-表示最小值
- end:结束值,+表示最大值
- count:数量
(3)示例:
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 2 # 已读取但未处理的消息数
2) "1703238306386-0" # 起始消息ID
3) "1703297838206-0" # 结束消息ID
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
127.0.0.1:6379[15]> XPENDING MQ mqGroupB
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
10.XACK - 将消息标记为"已处理"
(1)语法格式:
XACK key group id [id ...]
(2)参数:
- key:队列名
- group:消费者组名
- id:消息ID
(3)示例:
127.0.0.1:6379[15]> XACK MQ mqGroupA 1703238306386-0
(integer) 1
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 1
2) "1703297838206-0"
3) "1703297838206-0"
4) 1) 1) "consumer2"
2) "1"
127.0.0.1:6379[15]>
11.XCLAIM - 转移消息的归属权
(1)语法格式:
XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID] [LASTID lastid]
(2)参数:
- key:队列名
- group:消费者组名
- consumer:消费者名
- min-idle-time:从被读取到未处理的时间
- id:消息ID
(3)示例:
# 在指定的消费者组中,将cosumer2已读取5分钟(300秒,300000毫秒),但未处理的`1703297838206-0`消息转移给consumer1
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 1
2) "1703297838206-0"
3) "1703297838206-0"
4) 1) 1) "consumer2"
2) "1"
127.0.0.1:6379[15]> XCLAIM MQ mqGroupA consumer1 3600000 1703297838206-0
(empty array) # 转移不成功
127.0.0.1:6379[15]> XCLAIM MQ mqGroupA consumer1 300000 1703297838206-0
1) 1) "1703297838206-0"
2) 1) "name"
2) "Vegeta"
3) "sex"
4) "male"
5) "age"
6) "18"
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 1
2) "1703297838206-0"
3) "1703297838206-0"
4) 1) 1) "consumer1"
2) "1"
127.0.0.1:6379[15]> XPENDING MQ mqGroupA - + 10
1) 1) "1703297838206-0"
2) "consumer1"
3) (integer) 1171095 # IDLE被重置了
4) (integer) 2 # 读取次数被+1
(4)说明:某个消费者读取了消息但没有处理,这时消费者宕机或重启等就会导致该消息失踪。那么就需要该消息转移给其他的消费者处理,就是消息转移。转移除了要指定ID外,还需要指定min-idle-time最小空闲时间,该值要小于消息从被读取到未处理的时间。
三、消息队列的帮助命令
(1)语法格式:
HELP XXX
(2)参数:
- XXX:命令关键字
(3)示例:
127.0.0.1:6379[15]> help XADD
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
summary: Appends a new message to a stream. Creates the key if it doesn't exist.
since: 5.0.0
group: stream
127.0.0.1:6379[15]> HELP XGROUP
XGROUP (null)
summary: A container for consumer groups commands.
since: 5.0.0
group: stream
XGROUP CREATE key group id|$ [MKSTREAM] [ENTRIESREAD entries-read]
summary: Creates a consumer group.
since: 5.0.0
group: stream
XGROUP CREATECONSUMER key group consumer
summary: Creates a consumer in a consumer group.
since: 6.2.0
group: stream
XGROUP DELCONSUMER key group consumer
summary: Deletes a consumer from a consumer group.
since: 5.0.0
group: stream
XGROUP DESTROY key group
summary: Destroys a consumer group.
since: 5.0.0
group: stream
XGROUP HELP (null)
summary: Returns helpful text about the different subcommands.
since: 5.0.0
group: stream
XGROUP SETID key group id|$ [ENTRIESREAD entries-read]
summary: Sets the last-delivered ID of a consumer group.
since: 5.0.0
group: stream
127.0.0.1:6379[15]>