Stream

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Stream弥补了Redis作为MQ(message queue)技术选型上的不足之处;Redis 5.0发布的Stream相比Pub/Sub模块,Stream支持消息持久化,结合sentinel或cluster使其成为了一个比较可靠的消息队列。尽管我认为它很难成为公司MQ的技术选型产品,但是关于Stream的使用和特性(消费组),仍值得一探究竟。

1、简介

Stream弥补了Redis作为MQ(message queue)技术选型上的不足之处;Redis 5.0发布的Stream相比Pub/Sub模块,Stream支持消息持久化,结合sentinel或cluster使其成为了一个比较可靠的消息队列。尽管我认为它很难成为公司MQ的技术选型产品,但是关于Stream的使用和特性(消费组),仍值得一探究竟。

Stream对标消息队列,因此几乎具备了MQ所有的特性,以下列出Stream所具有的部分特性:

  • 消息顺序存储
  • 消息ID序列化规则生成
  • 消息的遍历
  • 消息阻塞/非阻塞式获取
  • 客户端分组消费消息
  • 消息确认机制
  • 消息异常机制
  • 消息队列监控

在文中也会说到Stream的这些特性。

2、Stream内部探索

2.1 Stream 结构

在探索Stream的内部结构之前,先看一张清晰的Stream结构图:

如下是关于上图的名词解析:

  • Message Content:消息内容
  • Consumer group:消费组,通过XGROUP CREATE 命令创建,一个消费组可以有多个消费者
  • Last_delivered_id:游标,每个消费组有一个游标,任意消费者读取消息后,游标都会向前移动
  • Consumer:消费者,消费组中的消费者
  • Pending_ids:状态变量,每个消费者会有一个状态变量,用于记录被当前消费者读取,但是并未ack的消息id


2.2 四个唯一

Stream内部维护了一个消息链表,以此使得消息能够具有队列的特性。在Stream中有四个唯一需要了解:

  1. 每个Stream都具有唯一的名称
  2. 每个消息(Message)都具有一个由系统分配或者客户端指定唯一ID
  3. 每个Stream中的消费组(Consumer_Group)具有唯一名称
  4. 每个消费组(Consumer_Group)中的消费者(Consumer)具有唯一名称

2.3 消息ID

Stream的消息ID可以由服务端自动生成,也可以由客户端传入,如下图是自动生成的结构:

系统自动生成的规则

<millisecondsTime>-<sequenceNumber>

millisecondsTime指的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID。

sequenceNumber指的是序列号,在相同的millisecondsTime毫秒下,序列号从0开始递增,序列号是64位长度,理论上在统一毫秒内生成的数据量无法到达这个级别,因此不用担心sequenceNumber会不够用。

客户端显示传入规则

Redis对于ID有强制要求,格式必须是<millisecondsTime>-<sequenceNumber>,最小ID为0-1,并且后续ID不能小于前一个ID


2.4 消息内容

Stream的消息内容,也就是图中的Message Content它的结构类似Hash结构,以key-value的形式存在。

3、Stream指令

3.1 指令汇总

Stream的指令根据可以分为两类,分别是消息队列相关指令,消费组相关指令。

消息队列相关指令:

令名称

指令作用

XADD

添加消息到队列末尾

XTRIM

限制Stream的长度,如果已经超长会进行截取

XDEL

删除消息

XLEN

获取Stream中的消息长度

XRANGE

获取消息列表(可以指定范围),忽略删除的消息

XREVRANGE

和XRANGE相比区别在于反向获取,ID从大到小

XREAD

获取消息(阻塞/非阻塞),返回大于指定ID的消息

消费组相关指令:

令名称

指令作用

XGROUP CREATE

创建消费者组

XREADGROUP GROUP

读取消费者组中的消息

XACK

ack消息,消息被标记为“已处理”

XGROUP SETID

设置消费者组最后递送消息的ID

XGROUP DELCONSUMER

删除消费者组

XPENDING

打印待处理消息的详细信息

XCLAIM

转移消息的归属权(长期未被处理/无法处理的消息,转交给其他消费者组进行处理)

XINFO

打印Stream\Consumer\Group的详细信息

XINFO GROUPS

打印消费者组的详细信息

XINFO STREAM

打印Stream的详细信息


3.2 XADD

XADD 用于向Stream 队列中添加消息,如果指定的Stream 队列不存在,则该命令执行时会新建一个Stream 队列。

XADD的指令语法:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

如下通过XADD展示了定义ID的两种方式,具体可以看2.3。

3.2 XTRIM

XTRIM 用于对Stream的长度进行限定。

XTRIM 的指令语法:

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]

  • MAXLEN 允许的最大长度,如果长度超出则会抛弃队列前面的消息
  • MINID 允许的最小id,从某个id值开始保留,其余的将会被抛弃

3.3 XDEL

XDEL 用于删除消息。

XDEL 的指令语法:

XDEL key ID [ID ...]


3.4 XLEN

XLEN 用于获取Stream 队列的消息的长度。

XLEN 的指令语法:

XLEN key

3.5 XRANGE

XRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。

XRANGE 的指令语法:

XRANGE key start end [COUNT count]

  • start 表示开始值,-代表最小值
  • end 表示结束值,+代表最大值
  • count 表示最多获取多少个值

3.6 XREVRANGE

XREVRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。与XRANGE 的区别在于,获取消息列表元素的方向是相反的,end在前,start在后。

XREVRANGE 的指令语法:

XREVRANGE key end start [COUNT count]

3.7 XREAD

XREAD 用于获取消息(阻塞/非阻塞),只会返回大于指定ID的消息。

XREAD 的指令语法:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

  • COUNT 最多读取多少条消息
  • BLOCK 是否已阻塞的方式读取消息,默认不阻塞,如果milliseconds设置为0,表示永远阻塞

$代表特殊ID,表示以当前Stream已经存储的最大的ID作为最后一个ID,当前Stream中不存在大于当前最大ID的消息,因此此时返回nil。

0-0代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用0(00/000也都是可以的……)。

阻塞方式获取Stream中的指令,这里演示阻塞获取一条消息

3.8 XGROUP CREATE

XGROUP CREATE 用于创建消费者组。

XGROUP CREATE 的指令语法:

XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]

XGROUP CREATE中的指令没什么复杂的,第一个中括号中的几个参数最为重要,如下图两种方式:

  • $表示从Stream尾部开始消费,会忽略Stream中目前已有的数据
  • 0表示从Stream头部开始消费

如果Stream不存在,XGROUP CREATE 语法将会报错,因此可以得出不允许在不存在的Stream上创建消费者组


3.9 XREADGROUP GROUP

XREADGROUP GROUP 用于读取消费者组中的消息。

XREADGROUP GROUP 的指令语法:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

注意,这里有一个比较重要的知识点,刚开始的时候可能容易搞错:

>这个特殊符号表示消息到目前为止,从未传递给其他消费者的消息

0表示指定消息ID,因为ID均大于0-0(0代指0-0),因此代表从Stream 的队列头部开始获取消息

在如下截图中,为何第一次 mystream 0 获取消息返回empty,在执行完 mystream > 之后,第二 mystream 0 却成功的获取到了消息,但是很明显mystream中刚添加了两条消息,第一次不应该失败才对呀?

这是因为,当指定ID进行消息获取时,命令将会让我们访问我们的历史待处理消息(曾被获取,但是未ack)。即传递给这个指定消费者(由提供的名称标识)的消息集,并且到目前为止从未使用XACK进行确认。


XREADGROUP GROUP 也可以像XREAD 一样使用阻塞的方式获取消息

当向mystream中添加消息后,阻塞读返回

3.10 XACK

XACK 用于标记为“已处理”。


XACK 的指令语法:

XACK key group ID [ID ...]


结合XREADGROUP GROUP 中指定ID的方式只能获取未ack的未处理消息的特性,测试XACK指令。从如下的测试示例中可以得出两个结论:

  • 消息首次ack成功,返回1,ack失败返回0
  • 3.9中的结论是正确的


3.11 XPENDING

XPENDING 用于打印待处理消息的详细信息。


XPENDING 指令是非常有用的,因为它可以打印待处理消息的信息。如果在一个消费者组中存在多个消费者,如果存在部分消费者永久的故障,无法再处理消息了,我们就可以通过XPENDING 指令来查看指定消费者组中的消费者未ack的消息,然后转移给其他消费者进行处理。


XPENDING 的指令语法:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

XPENDING 返回值解析:

  1. 第一个参数表示当前消费者中待处理消息的总数
  2. 第二个参数表示待处理消息的最小ID
  3. 第三个参数表示待处理消息的最大ID
  4. 第四个参数表示消费者列表和未处理的消息数量

3.11 XCLAIM

XCLAIM 用于转移消息的归属权。

XCLAIM 的指令语法:

XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RET


指令参数解析:

  • key 表示Stream的名称
  • group 表示需要转移消息的归属权的消费者组名称
  • consumer 表示接收消息的消费者名称
  • min-idle-time 表示最小空闲时间,只有后续指定ID的消息空闲时间大于指定的空闲时间,消息归属权转移指令才会生效
  • ID [] 需要转移归属权的消息ID,数组,可以是多个

示例中,将consumer-1中ID为1631719560149-0的未处理的消息的归属权转移到consumer-2下:


3.13 XINFO

XINFO 用于打印Stream\Consumer\Group的详细信息。

XINFO  的指令语法:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

示例打印指定STREAM的详细消息


4、关于Stream优化内存的事情

使用Stream有两个点需要注意,如果使用不当都会导致内存消耗增大。

  1. 待处理消息过多,消息未及时ack
  2. Stream消息持续持久化,使用XDEL删除消息

关于第一点,待处理消息过多,消息未及时ack,其导致内存增加的原因是,Stream会为每个消费者维护一个PEL列表,PEL列表用于存储处理完但未及时ack的消息ID。我们在实际使用过程中,处理完的消息一定要及时ack,也有定时检查是否有消费者不可用导致消息堆积的情况。

XPENDING能查询出消费者中待处理的消息,就是因为有PEL的存在。

关于第二点,使用XDEL删除Stream中不在需要的消息,其导致内存增加的原因是,Stream的XDEL删除消息的指令,并不会从内存上删除消息,它只是给消息打上标记位,下次通过XRANGE指令忽略这些消息而已。因此我们可以设置Stream的最大长度,来解决这个问题,在XADD中使用MAXLEN指定Stream队列的长度,当消息超出长度就会将队列头消息清除掉。(不过这种处理方式一定要做到及时处理消息,避免消息的丢失。)

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]


相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
SQL 关系型数据库 测试技术
沉浸式学习PostgreSQL|PolarDB 20: 学习成为数据库大师级别的优化技能
在上一个实验《沉浸式学习PostgreSQL|PolarDB 19: 体验最流行的开源企业ERP软件 odoo》 中, 学习了如何部署odoo和polardb|pg. 由于ODOO是非常复杂的ERP软件, 对于关系数据库的挑战也非常大, 所以通过odoo业务可以更快速提升同学的数据库优化能力, 发现业务对数据库的使用问题(如索引、事务对锁的运用逻辑问题), 数据库的代码缺陷, 参数或环境配置问题, 系统瓶颈等.
1093 1
|
7月前
|
Swift iOS开发 开发者
苹果app上架-ios上架苹果商店app store 之苹果支付In - App Purchase内购配置-优雅草卓伊凡
苹果app上架-ios上架苹果商店app store 之苹果支付In - App Purchase内购配置-优雅草卓伊凡
723 13
苹果app上架-ios上架苹果商店app store 之苹果支付In - App Purchase内购配置-优雅草卓伊凡
|
存储 Java
java 服务 JVM 参数设置配置
java 服务 JVM 参数设置配置
309 3
|
11月前
|
编解码 监控 网络协议
HLS 和 RTSP 的优势
【10月更文挑战第25天】HLS和RTSP各自的优势使其在不同的应用场景中发挥着重要作用。HLS适用于需要广泛兼容性、自适应码率和简单部署的场景,如在线视频点播、直播等;而RTSP则更适合对实时性、精确播放控制和互操作性要求较高的专业级实时流媒体应用。了解它们的优势有助于根据具体的项目需求选择最合适的流媒体传输协议。
348 61
|
编译器 C语言
为什么被调函数内部不能用 sizeof(arr) / size(arr[0]) 计算数组长度?
该文解答了一个关于C语言的疑问,涉及64位RedPandaDevc++编译器。示例代码展示了不能通过`sizeof(arr)/sizeof(arr[0])`在函数中计算数组长度的问题,因为`arr`在函数中作为指针传递,`sizeof(arr)`返回指针大小(可能是4或8字节),而非数组长度。因此,代码在函数内输出可能为2。而在`main()`函数中,`sizeof(arr)`会计算整个数组大小,正确返回数组长度。文章强调了数组名在不同上下文中的差异以及`sizeof`操作符的使用注意事项。
290 4
|
9月前
|
人工智能 关系型数据库 分布式数据库
PolarDB-PG AI最佳实践3 :PolarDB AI多模态相似性搜索最佳实践
本文介绍了如何利用PolarDB结合多模态大模型(如CLIP)实现数据库内的多模态数据分析和查询。通过POLAR_AI插件,可以直接在数据库中调用AI模型服务,无需移动数据或额外的工具,简化了多模态数据的处理流程。具体应用场景包括图像识别与分类、图像到文本检索和基于文本的图像检索。文章详细说明了技术实现、配置建议、实战步骤及多模态检索示例,展示了如何在PolarDB中创建模型、生成embedding并进行相似性检索
|
11月前
|
存储 人工智能 文字识别
AI与OCR:数字档案馆图像扫描与文字识别技术实现与项目案例
本文介绍了纸质档案数字化的技术流程,包括高精度扫描、图像预处理、自动边界检测与切割、文字与图片分离抽取、档案识别与文本提取,以及识别结果的自动保存。通过去噪、增强对比度、校正倾斜等预处理技术,提高图像质量,确保OCR识别的准确性。平台还支持多字体识别、批量处理和结构化存储,实现了高效、准确的档案数字化。具体应用案例显示,该技术在江西省某地质资料档案馆中显著提升了档案管理的效率和质量。
1103 1
|
NoSQL 算法 Java
诡异,Redis Proxy RT上升后连接倾斜
本文细致地描述了关于Redis Proxy RT上升后连接倾斜问题的排查过程和根本原因,最后给出了优化方案。
|
存储 大数据 关系型数据库
【数据库三大范式】让我们来聊一聊数据库的三大范式和反范式设计
数据库三大范式是指数据库设计中的规范化原则,它们分别是第一范式(1NF)第二范式(2NF)和第三范式(3NF)。第一范式(1NF)第二范式(2NF)第三范式(3NF)
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
369 0