Redis5新特性Streams作消息队列

简介:

Redis5新特性Streams作消息队列
前言
Redis 5 新特性中,Streams 数据结构的引入,可以说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作为消息队列使用时,得到更完善,更强大的原生支持,其中尤为明显的是持久化消息队列。同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。本文就 Streams 数据结构中常用 API 进行分析。

准备
本文所使用 Redis 版本为 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,与本文中描述略有不同。

添加消息
Streams 添加数据使用 XADD 指令进行添加,消息中的数据以 K-V 键值对的形式进行操作。一条消息可以存在多个键值对,添加命令格式:

XADD key ID field string [field string ...]
其中 key 为 Streams 的名称,ID 为消息的唯一标志,不可重复,field string 就为键值对。下面我们就添加以 person 为名称的流,进行操作。

XADD person * name ytao des https://ytao.top
上面添加案例中,ID 使用 * 号复制,这里代表着服务端自动生成 Id,添加后返回数据 "1578238486193-0"

这里自动生成的 Id 格式为 -
Id 是由两部分组成:

millisecondsTime 为当前服务器时间毫秒时间戳。
sequenceNumber 当前序列号,取值来源于当前毫秒内,生成消息的顺序,默认从 0 开始加 1 递增。
比如:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。

除了服务端自动生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下条件限制:

Id 中的前后部分必须为数字。
最小 Id 为 0-1,不能为 0-0,但是 2-0,3-0 .... 是被允许的。
添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。
否则,当不满足上述条件时,添加后会抛出异常:

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
实际上,当添加一条消息时,会进行两部操作。第一步,先判断如果不存在 Streams,则创建 Streams 的名称,再添加消息到 Streams 中。即使添加消息时,由于 Id 异常,也可以在 Redis 中存在以当前 Streams 的名称。
Streams 中 Id 也可作为指针使用,因为它是一个有序的标记。

生产中,如果这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 Streams 的设计初期也有考虑到这个问题,那就是可以指定 Streams 的容量。如果容量操作这个设定的值,就会对调旧的消息。在添加消息时,设置 MAXLEN 参数。

XADD person MAXLEN 5 * name ytao des https://ytao.top
这样就指定该了 Streams 中的容量为 5 条消息。也可使用 XTRIM 截取消息,从小到大剔除多余的消息:

XTRIM person MAXLEN 8
消息数量
查看消息数量使用 XLEN 指令进行操作。

XLEN key
例:查看 person 流中的消息数量:

XLEN person
(integer) 5

查询消息
查询 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。

XRANGE
查询数据时,可以按照指定 Id 范围进行查询,XRANGE 查询指令格式:

XRANGE key start end [COUNT count]
参数说明:

key 为 Streams 的名称
start 为范围查询开始 Id,包含本 Id。
start 为范围查询结束 Id,包含本 Id。
Count 为查询返回最大的消息数量,非必填。
这里 start 和 end 有-和+两个非指定值,他们分别表示无穷小和无穷大,所以当使用这个两个值时,会查询出全部的消息。

XRANGE person - +

1) 1) "0-1"
2) 1) "name"

  2) "ytao"
  3) "des"
  4) "https://ytao.top"

2) 1) "0-2"
2) 1) "name"

  2) "luffy"
  3) "des"
  4) "valiant!"

3) 1) "2-0"
2) 1) "name"

  2) "gaga"
  3) "des"
  4) "fishion!"

上面查询的消息数据,可以看到是按照先进先出的顺序查询出来的。

使用 COUNT 指定查询返回的数量:

查询所有的消息,并且返回一条数据

XRANGE person - + COUNT 1
1) 1) "0-1"

2) 1) "name"

  2) "ytao"
  3) "des"
  4) "https://ytao.top"

在范围查询中,Id 的后半部分可省略,后半部分中的数据会全部查询到。

XREVRANGE
XREVRANGE 的查询和 XRANGE 指令中的使用类似,但查询的 start 和 end 参数顺序进行了调换:

XREVRANGE key end start [COUNT count]
使用案例:

XREVRANGE person + -
1) 1) "2-0"

2) 1) "name"

  2) "gaga"
  3) "des"
  4) "fishion!"

2) 1) "0-2"
2) 1) "name"

  2) "luffy"
  3) "des"
  4) "valiant!"

3) 1) "0-1"
2) 1) "name"

  2) "ytao"
  3) "des"
  4) "https://ytao.top"

查询后的结果与 XRANGE 的结果顺序刚好相反,其他都一样,这两个指令可进行消息的升序和降序的返回。

删除消息
删除消息使用 XDEL 指令操作,只需指定将要删除的 Streams 名称和 Id 即可,支持一次删除多个消息 。

XDEL key ID [ID ...]
删除案例:

查询所有消息

XRANGE person - +
1) 1) "0-1"

2) 1) "name"

  2) "ytao"
  3) "des"
  4) "https://ytao.top"

2) 1) "0-2"
2) 1) "name"

  2) "luffy"
  3) "des"
  4) "valiant!"

3) 1) "2-0"
2) 1) "name"

  2) "gaga"
  3) "des"
  4) "fishion!"

删除消息

XDEL person 2-0
(integer) 1

再次查询删除后的所有消息

XRANGE person - +

1) 1) "0-1"
2) 1) "name"

  2) "ytao"
  3) "des"
  4) "https://ytao.top"

2) 1) "0-2"
2) 1) "name"

  2) "luffy"
  3) "des"
  4) "valiant!"

查询删除后的长度

XLEN person
(integer) 2

从上面可以看到,删除消息后,长度也会减少相应的数量。

消费消息
在 Redis 的 PUB/SUB 中,我们是通过订阅来消费消息,在 Streams 数据结构中,同样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不仅支持单独消费,而且还可以支持群组消费。

单独消费
单独消费使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的消息。当 ID 值使用 $ 赋予时,表示已存在消息的最大 Id 值。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
上面的 COUNT 参数用来指定读取的最大数量,与 XRANGE 的用法一样。

XREAD COUNT 1 STREAMS person 0

1) 1) "person"
2) 1) 1) "0-1"

     2) 1) "name"
        2) "ytao"
        3) "des"
        4) "https://ytao.top"

XREAD COUNT 2 STREAMS person 0

1) 1) "person"
2) 1) 1) "0-1"

     2) 1) "name"
        2) "ytao"
        3) "des"
        4) "https://ytao.top"
  2) 1) "0-2"
     2) 1) "name"
        2) "luffy"
        3) "des"
        4) "valiant!"

在 XREAD 里面还有个 BLOCK 参数,这个是用来阻塞订阅消息的,BLOCK 携带的参数为阻塞时间,单位为毫秒,如果在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。

窗口 1 开启阻塞,等待新消息的到来

XREAD BLOCK 0 STREAMS person $

另开一个连接窗口 2,添加一条新的消息

XADD person 2-2 name tao des coder

"2-2"

窗口 1,获取到有新的消息来消费,并且带有阻塞的时间

XREAD BLOCK 0 STREAMS person $

1) 1) "person"
2) 1) 1) "2-2"

     2) 1) "name"
        2) "tao"
        3) "des"
        4) "coder"

(60.81s)
当使用 XREAD 进行顺序消费时,需要额外记录下读取到位置的 Id,方便下次继续消费。

群组消费
群组消费的主要目的也就是为了分流消息给不同的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,我们需要做三件事:创建群组,群组读取消息,向服务端确认消息以处理。

群组操作
操作群组使用 XGROUP 指令:

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
上面命令中,包含操作有:

CREATE 创建消费组。
SETID 修改下一个处理消息的 Id。
DESTROY 销毁消费组。
DELCONSUMER 删除消费组中指定的消费者。
我们当前需要使用的是创建消费组:

以当前存在的最大 Id 作为消费起始

XGROUP CREATE person group1 $
OK

群组读取消息
群组读取使用 XREADGROUP 指令,COUNT和BLOCK的使用类似 XREAD 的操作,只是多了个群组和消费者的指定:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
由于群组消费和单独消费类似,这里只进行个阻塞分析,这里 Id 也有个特殊值>,表示还未进行消费的消息:

窗口 1,消费群组中,taotao 消费者建立阻塞监听

XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >

窗口 2,消费群组中,yangyang 消费者建立阻塞监听

XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >

窗口 3,添加消费消息

XADD person 3-1 name tony des 666

"3-1"

窗口 1,读取到新消息,此时 窗口 2 没有任何反应

XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >

1) 1) "person"
2) 1) 1) "3-1"

     2) 1) "name"
        2) "tony"
        3) "des"
        4) "666"

(77.54s)

窗口 3,再次添加消费消息

XADD person 3-2 name james des abc!
"3-2"

窗口 2,读取到新消息,此时 窗口 1 没有任何反应

XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >

1) 1) "person"
2) 1) 1) "3-2"

     2) 1) "name"
        2) "james"
        3) "des"
        4) "abc!"

(76.36s)
以上执行流程中,group1 群组中有两个消费者,当添加两条消息后,这两个消费者轮流消费。

消息ACK
消息消费后,为避免再次重复消费,这是需要向服务端发送 ACK,确保消息被消费后的标记。
例如下列情况,我们上面我们将最新两条消息已进行了消费,但是当我们再次读取消息时,还是被读到:

XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"

2) 1) 1) "3-2"

     2) 1) "name"
        2) "james"
        3) "des"
        4) "abc!"

这时,我们使用 XACK 指令告诉服务器,我们已处理的消息:

XACK key group ID [ID ...]0
让服务器标记 3-2 已处理:

XACK person group1 3-2
(integer) 1

再次获取群组读取消息:

XREADGROUP GROUP group1 yangyang STREAMS person 0

1) 1) "person"
2) (empty list or set)
队列中没有了可读消息。
除了上面以讲解到的 API 外,查看消费群组信息可使用 XINFO 指令查看,本文不做分析。

总结
上面对 Streams 常用 API 进行了分析,我们可以感受到 Redis 在消息队列支持的道路上,也越来越强大。如果使用过它的 PUB/SUB 功能的话,就会感受到 5.x 迭代正是将你的一些痛点进行了优化。

原文地址https://www.cnblogs.com/ytao-blog/p/12522070.html

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
74 6
|
19天前
|
存储 缓存 监控
利用 Redis 缓存特性避免缓存穿透的策略与方法
【10月更文挑战第23天】通过以上对利用 Redis 缓存特性避免缓存穿透的详细阐述,我们对这一策略有了更深入的理解。在实际应用中,我们需要根据具体情况灵活运用这些方法,并结合其他技术手段,共同保障系统的稳定和高效运行。同时,要不断关注 Redis 缓存特性的发展和变化,及时调整策略,以应对不断出现的新挑战。
53 10
|
2月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
96 20
剖析 Redis List 消息队列的三种消费线程模型
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
27 2
|
1月前
|
存储 消息中间件 NoSQL
【redis】redis的特性和主要应用场景
【redis】redis的特性和主要应用场景
103 1
|
1月前
|
NoSQL 关系型数据库 MySQL
Redis 事务特性、原理、具体命令操作全方位诠释 —— 零基础可学习
本文全面阐述了Redis事务的特性、原理、具体命令操作,指出Redis事务具有原子性但不保证一致性、持久性和隔离性,并解释了Redis事务的适用场景和WATCH命令的乐观锁机制。
202 0
Redis 事务特性、原理、具体命令操作全方位诠释 —— 零基础可学习
|
1月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
37 0
|
3月前
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
69 0
|
3月前
|
NoSQL Java 调度
Lettuce的特性和内部实现问题之Redis的管道模式提升性能的问题如何解决
Lettuce的特性和内部实现问题之Redis的管道模式提升性能的问题如何解决
|
3月前
|
NoSQL 网络协议 安全
Lettuce的特性和内部实现问题之Lettuce天然地使用管道模式与Redis交互的问题如何解决
Lettuce的特性和内部实现问题之Lettuce天然地使用管道模式与Redis交互的问题如何解决