Redis精通系列——Stream

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis精通系列——Stream

 本文已收录于专栏


《Redis精通系列》


上千人点赞收藏,全套Redis学习资料,大厂必备技能!


目录


1、简介


2、Stream内部探索


2.1 Stream 结构


2.2 四个唯一


2.3 消息ID


2.4 消息内容


3、Stream指令


3.1 指令汇总


3.2 XADD


3.2 XTRIM


3.3 XDEL


3.4 XLEN


3.5 XRANGE


3.6 XREVRANGE


3.7 XREAD


3.8 XGROUP CREATE


3.9 XREADGROUP GROUP


3.10 XACK


3.11 XPENDING


3.11 XCLAIM


3.13 XINFO


4、关于Stream优化内存的事情


1、简介

Stream弥补了Redis作为MQ(message queue)技术选型上的不足之处;Redis 5.0发布的Stream相比Pub/Sub模块,Stream支持消息持久化,结合sentinel或cluster使其成为了一个比较可靠的消息队列。尽管我认为它很难成为公司MQ的技术选型产品,但是关于Stream的使用和特性(消费组),仍值得一探究竟。


Stream对标消息队列,因此几乎具备了MQ所有的特性,以下列出Stream所具有的部分特性:


消息顺序存储

消息ID序列化规则生成

消息的遍历

消息阻塞/非阻塞式获取

客户端分组消费消息

消息确认机制

消息异常机制

消息队列监控

在文中也会说到Stream的这些特性。


2、Stream内部探索

2.1 Stream 结构

在探索Stream的内部结构之前,先看一张清晰的Stream结构图:

image.png如下是关于上图的名词解析:


Message Content:消息内容

Consumer group:消费组,通过XGROUP CREATE 命令创建,一个消费组可以有多个消费者

Last_delivered_id:游标,每个消费组有一个游标,任意消费者读取消息后,游标都会向前移动

Consumer:消费者,消费组中的消费者

Pending_ids:状态变量,每个消费者会有一个状态变量,用于记录被当前消费者读取,但是并未ack的消息id

2.2 四个唯一

Stream内部维护了一个消息链表,以此使得消息能够具有队列的特性。在Stream中有四个唯一需要了解:


每个Stream都具有唯一的名称

每个消息(Message)都具有一个由系统分配或者客户端指定唯一ID

每个Stream中的消费组(Consumer_Group)具有唯一名称

每个消费组(Consumer_Group)中的消费者(Consumer)具有唯一名称

2.3 消息ID

Stream的消息ID可以由服务端自动生成,也可以由客户端传入,如下图是自动生成的结构:image.pngmillisecondsTime指的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID。

sequenceNumber指的是序列号,在相同的millisecondsTime毫秒下,序列号从0开始递增,序列号是64位长度,理论上在统一毫秒内生成的数据量无法到达这个级别,因此不用担心sequenceNumber会不够用。


客户端显示传入规则

Redis对于ID有强制要求,格式必须是-,最小ID为0-1,并且后续ID不能小于前一个ID


2.4 消息内容

Stream的消息内容,也就是图中的Message Content它的结构类似Hash结构,以key-value的形式存在。


3、Stream指令

3.1 指令汇总

Stream的指令根据可以分为两类,分别是消息队列相关指令,消费组相关指令。

消息队列相关指令:


image.png3.2 XADD

XADD 用于向Stream 队列中添加消息,如果指定的Stream 队列不存在,则该命令执行时会新建一个Stream 队列。


XADD的指令语法:


XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value …]


如下通过XADD展示了定义ID的两种方式,具体可以看2.3。

image.png3.2 XTRIM

XTRIM 用于对Stream的长度进行限定。


XTRIM 的指令语法:


XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]


MAXLEN 允许的最大长度,如果长度超出则会抛弃队列前面的消息

MINID 允许的最小id,从某个id值开始保留,其余的将会被抛弃image.png

3.4 XLEN

XLEN 用于获取Stream 队列的消息的长度。

XLEN 的指令语法:

XLEN keyimage.png3.5 XRANGE

XRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。


XRANGE 的指令语法:


XRANGE key start end [COUNT count]


start 表示开始值,-代表最小值

end 表示结束值,+代表最大值

count 表示最多获取多少个值

image.png3.6 XREVRANGE

XREVRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。与XRANGE 的区别在于,获取消息列表元素的方向是相反的,end在前,start在后。


XREVRANGE 的指令语法:


XREVRANGE key end start [COUNT count]image.png3.7 XREAD

XREAD 用于获取消息(阻塞/非阻塞),只会返回大于指定ID的消息。


XREAD 的指令语法:


XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]


COUNT 最多读取多少条消息

BLOCK 是否已阻塞的方式读取消息,默认不阻塞,如果milliseconds设置为0,表示永远阻塞

$代表特殊ID,表示以当前Stream已经存储的最大的ID作为最后一个ID,当前Stream中不存在大于当前最大ID的消息,因此此时返回nil。


0-0代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用0(00/000也都是可以的……)。


image.png

image.png3.8 XGROUP CREATE

XGROUP CREATE 用于创建消费者组。


XGROUP CREATE 的指令语法:


XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]


XGROUP CREATE中的指令没什么复杂的,第一个中括号中的几个参数最为重要,如下图两种方式:


$表示从Stream尾部开始消费,会忽略Stream中目前已有的数据

0表示从Stream头部开始消费

image.png3.9 XREADGROUP GROUP

XREADGROUP GROUP 用于读取消费者组中的消息。


XREADGROUP GROUP 的指令语法:


XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]


注意,这里有一个比较重要的知识点,刚开始的时候可能容易搞错:

>这个特殊符号表示消息到目前为止,从未传递给其他消费者的消息

0表示指定消息ID,因为ID均大于0-0(0代指0-0),因此代表从Stream 的队列头部开始获取消息


在如下截图中,为何第一次 mystream 0 获取消息返回empty,在执行完 mystream > 之后,第二 mystream 0 却成功的获取到了消息,但是很明显mystream中刚添加了两条消息,第一次不应该失败才对呀?

这是因为,当指定ID进行消息获取时,命令将会让我们访问我们的历史待处理消息(曾被获取,但是未ack)。即传递给这个指定消费者(由提供的名称标识)的消息集,并且到目前为止从未使用XACK进行确认。

image.pngimage.png3.11 XPENDING

XPENDING 用于打印待处理消息的详细信息。


XPENDING 指令是非常有用的,因为它可以打印待处理消息的信息。如果在一个消费者组中存在多个消费者,如果存在部分消费者永久的故障,无法再处理消息了,我们就可以通过XPENDING 指令来查看指定消费者组中的消费者未ack的消息,然后转移给其他消费者进行处理。


XPENDING 的指令语法:


XPENDING key group [[IDLE min-idle-time] start end count [consumer]]


XPENDING 返回值解析:


第一个参数表示当前消费者中待处理消息的总数

第二个参数表示待处理消息的最小ID

第三个参数表示待处理消息的最大ID

第四个参数表示消费者列表和未处理的消息数量image.pngimage.png




3.11 XCLAIM

XCLAIM 用于转移消息的归属权。


XCLAIM 的指令语法:


XCLAIM key group consumer min-idle-time ID [ID …] [IDLE ms] [TIME ms-unix-time] [RET


指令参数解析:


key 表示Stream的名称

group 表示需要转移消息的归属权的消费者组名称

consumer 表示接收消息的消费者名称

min-idle-time 表示最小空闲时间,只有后续指定ID的消息空闲时间大于指定的空闲时间,消息归属权转移指令才会生效

ID [] 需要转移归属权的消息ID,数组,可以是多个

示例中,将consumer-1中ID为1631719560149-0的未处理的消息的归属权转移到consumer-2下:

image.png3.13 XINFO

XINFO 用于打印Stream\Consumer\Group的详细信息。


XINFO 的指令语法:


XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]


示例打印指定STREAM的详细消息

image.png4、关于Stream优化内存的事情

使用Stream有两个点需要注意,如果使用不当都会导致内存消耗增大。


待处理消息过多,消息未及时ack

Stream消息持续持久化,使用XDEL删除消息

关于第一点,待处理消息过多,消息未及时ack,其导致内存增加的原因是,Stream会为每个消费者维护一个PEL列表,PEL列表用于存储处理完但未及时ack的消息ID。我们在实际使用过程中,处理完的消息一定要及时ack,也有定时检查是否有消费者不可用导致消息堆积的情况。

XPENDING能查询出消费者中待处理的消息,就是因为有PEL的存在。image.pngimage.png


image.png

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
7月前
|
消息中间件 存储 NoSQL
深入Redis消息队列:Pub/Sub和Stream的对决【redis第六部分】
深入Redis消息队列:Pub/Sub和Stream的对决【redis第六部分】
245 0
|
7月前
|
消息中间件 NoSQL Java
别再用 Redis List 实现消息队列了,Stream 专为队列而生
别再用 Redis List 实现消息队列了,Stream 专为队列而生
144 0
|
2月前
|
消息中间件 NoSQL Redis
Redis Stream
10月更文挑战第20天
29 2
|
6月前
|
消息中间件 负载均衡 NoSQL
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
74 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
27 2
|
4月前
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
78 0
|
7月前
|
消息中间件 缓存 NoSQL
Redis stream 用做消息队列完美吗
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。 这篇文章,我们聊聊 Redis Stream 基本用法 ,以及如何在 SpringBoot 项目中应用 Redis Stream 。
Redis stream 用做消息队列完美吗
|
NoSQL Redis
redis序列化问题:invalid stream header
redis序列化问题:invalid stream header
634 0
|
7月前
|
消息中间件 存储 负载均衡
Redis类型 Stream Bitfield
Redis类型 Stream Bitfield
44 0
|
7月前
|
消息中间件 存储 NoSQL
Redis Stream: 实时消息处理的利器,让你的数据流畅又可靠!
Redis Stream: 实时消息处理的利器,让你的数据流畅又可靠!
397 0