本文已收录于专栏
《Redis精通系列》
上千人点赞收藏,全套Redis学习资料,大厂必备技能!
目录
1、简介
2、Stream内部探索
2.1 Stream 结构
2.2 四个唯一
2.3 消息ID
2.4 消息内容
3、Stream指令
3.1 指令汇总
3.2 XADD
3.2 XTRIM
3.3 XDEL
3.4 XLEN
3.5 XRANGE
3.6 XREVRANGE
3.7 XREAD
3.8 XGROUP CREATE
3.9 XREADGROUP GROUP
3.10 XACK
3.11 XPENDING
3.11 XCLAIM
3.13 XINFO
4、关于Stream优化内存的事情
1、简介
Stream弥补了Redis作为MQ(message queue)技术选型上的不足之处;Redis 5.0发布的Stream相比Pub/Sub模块,Stream支持消息持久化,结合sentinel或cluster使其成为了一个比较可靠的消息队列。尽管我认为它很难成为公司MQ的技术选型产品,但是关于Stream的使用和特性(消费组),仍值得一探究竟。
Stream对标消息队列,因此几乎具备了MQ所有的特性,以下列出Stream所具有的部分特性:
消息顺序存储
消息ID序列化规则生成
消息的遍历
消息阻塞/非阻塞式获取
客户端分组消费消息
消息确认机制
消息异常机制
消息队列监控
在文中也会说到Stream的这些特性。
2、Stream内部探索
2.1 Stream 结构
在探索Stream的内部结构之前,先看一张清晰的Stream结构图:
如下是关于上图的名词解析:
Message Content:消息内容
Consumer group:消费组,通过XGROUP CREATE 命令创建,一个消费组可以有多个消费者
Last_delivered_id:游标,每个消费组有一个游标,任意消费者读取消息后,游标都会向前移动
Consumer:消费者,消费组中的消费者
Pending_ids:状态变量,每个消费者会有一个状态变量,用于记录被当前消费者读取,但是并未ack的消息id
2.2 四个唯一
Stream内部维护了一个消息链表,以此使得消息能够具有队列的特性。在Stream中有四个唯一需要了解:
每个Stream都具有唯一的名称
每个消息(Message)都具有一个由系统分配或者客户端指定唯一ID
每个Stream中的消费组(Consumer_Group)具有唯一名称
每个消费组(Consumer_Group)中的消费者(Consumer)具有唯一名称
2.3 消息ID
Stream的消息ID可以由服务端自动生成,也可以由客户端传入,如下图是自动生成的结构:millisecondsTime指的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID。
sequenceNumber指的是序列号,在相同的millisecondsTime毫秒下,序列号从0开始递增,序列号是64位长度,理论上在统一毫秒内生成的数据量无法到达这个级别,因此不用担心sequenceNumber会不够用。
客户端显示传入规则
Redis对于ID有强制要求,格式必须是-,最小ID为0-1,并且后续ID不能小于前一个ID
2.4 消息内容
Stream的消息内容,也就是图中的Message Content它的结构类似Hash结构,以key-value的形式存在。
3、Stream指令
3.1 指令汇总
Stream的指令根据可以分为两类,分别是消息队列相关指令,消费组相关指令。
消息队列相关指令:
3.2 XADD
XADD 用于向Stream 队列中添加消息,如果指定的Stream 队列不存在,则该命令执行时会新建一个Stream 队列。
XADD的指令语法:
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value …]
如下通过XADD展示了定义ID的两种方式,具体可以看2.3。
3.2 XTRIM
XTRIM 用于对Stream的长度进行限定。
XTRIM 的指令语法:
XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
MAXLEN 允许的最大长度,如果长度超出则会抛弃队列前面的消息
MINID 允许的最小id,从某个id值开始保留,其余的将会被抛弃
3.4 XLEN
XLEN 用于获取Stream 队列的消息的长度。
XLEN 的指令语法:
XLEN key3.5 XRANGE
XRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。
XRANGE 的指令语法:
XRANGE key start end [COUNT count]
start 表示开始值,-代表最小值
end 表示结束值,+代表最大值
count 表示最多获取多少个值
3.6 XREVRANGE
XREVRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。与XRANGE 的区别在于,获取消息列表元素的方向是相反的,end在前,start在后。
XREVRANGE 的指令语法:
XREVRANGE key end start [COUNT count]3.7 XREAD
XREAD 用于获取消息(阻塞/非阻塞),只会返回大于指定ID的消息。
XREAD 的指令语法:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
COUNT 最多读取多少条消息
BLOCK 是否已阻塞的方式读取消息,默认不阻塞,如果milliseconds设置为0,表示永远阻塞
$代表特殊ID,表示以当前Stream已经存储的最大的ID作为最后一个ID,当前Stream中不存在大于当前最大ID的消息,因此此时返回nil。
0-0代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用0(00/000也都是可以的……)。
3.8 XGROUP CREATE
XGROUP CREATE 用于创建消费者组。
XGROUP CREATE 的指令语法:
XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]
XGROUP CREATE中的指令没什么复杂的,第一个中括号中的几个参数最为重要,如下图两种方式:
$表示从Stream尾部开始消费,会忽略Stream中目前已有的数据
0表示从Stream头部开始消费
3.9 XREADGROUP GROUP
XREADGROUP GROUP 用于读取消费者组中的消息。
XREADGROUP GROUP 的指令语法:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
注意,这里有一个比较重要的知识点,刚开始的时候可能容易搞错:
>这个特殊符号表示消息到目前为止,从未传递给其他消费者的消息
0表示指定消息ID,因为ID均大于0-0(0代指0-0),因此代表从Stream 的队列头部开始获取消息
在如下截图中,为何第一次 mystream 0 获取消息返回empty,在执行完 mystream > 之后,第二 mystream 0 却成功的获取到了消息,但是很明显mystream中刚添加了两条消息,第一次不应该失败才对呀?
这是因为,当指定ID进行消息获取时,命令将会让我们访问我们的历史待处理消息(曾被获取,但是未ack)。即传递给这个指定消费者(由提供的名称标识)的消息集,并且到目前为止从未使用XACK进行确认。
3.11 XPENDING
XPENDING 用于打印待处理消息的详细信息。
XPENDING 指令是非常有用的,因为它可以打印待处理消息的信息。如果在一个消费者组中存在多个消费者,如果存在部分消费者永久的故障,无法再处理消息了,我们就可以通过XPENDING 指令来查看指定消费者组中的消费者未ack的消息,然后转移给其他消费者进行处理。
XPENDING 的指令语法:
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
XPENDING 返回值解析:
第一个参数表示当前消费者中待处理消息的总数
第二个参数表示待处理消息的最小ID
第三个参数表示待处理消息的最大ID
第四个参数表示消费者列表和未处理的消息数量image.png
3.11 XCLAIM
XCLAIM 用于转移消息的归属权。
XCLAIM 的指令语法:
XCLAIM key group consumer min-idle-time ID [ID …] [IDLE ms] [TIME ms-unix-time] [RET
指令参数解析:
key 表示Stream的名称
group 表示需要转移消息的归属权的消费者组名称
consumer 表示接收消息的消费者名称
min-idle-time 表示最小空闲时间,只有后续指定ID的消息空闲时间大于指定的空闲时间,消息归属权转移指令才会生效
ID [] 需要转移归属权的消息ID,数组,可以是多个
示例中,将consumer-1中ID为1631719560149-0的未处理的消息的归属权转移到consumer-2下:
3.13 XINFO
XINFO 用于打印Stream\Consumer\Group的详细信息。
XINFO 的指令语法:
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
示例打印指定STREAM的详细消息
4、关于Stream优化内存的事情
使用Stream有两个点需要注意,如果使用不当都会导致内存消耗增大。
待处理消息过多,消息未及时ack
Stream消息持续持久化,使用XDEL删除消息
关于第一点,待处理消息过多,消息未及时ack,其导致内存增加的原因是,Stream会为每个消费者维护一个PEL列表,PEL列表用于存储处理完但未及时ack的消息ID。我们在实际使用过程中,处理完的消息一定要及时ack,也有定时检查是否有消费者不可用导致消息堆积的情况。
XPENDING能查询出消费者中待处理的消息,就是因为有PEL的存在。