别再用 Redis List 实现消息队列了,Stream 专为队列而生

简介: Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。

XADD:插入消息


「云岚宗众弟子听命,击杀萧炎!」


当云山最后一字落下,那弥漫的紧绷气氛,顿时宣告破碎,悬浮半空的众多云岚宗长老背后双翼一振,便是咻咻的划过天际,追杀萧炎。


云山使用以下指令向队列中插入「追杀萧炎」命令,让长老带领子弟去执行。


XADD 云岚宗 * task kill name 萧炎
"1645936602161-0"


Stream 中的每个元素由键值对的形式组成,不同元素可以包含不同数量的键值对

该命令的语法如下:


XADD streamName id field value [field value ...]


消息队列名称后面的 「*」 ,表示让 Redis 为插入的消息自动生成唯一 ID,当然也可以自己定义。


消息 ID 由两部分组成:


  • 当前毫秒内的时间戳;


  • 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令。


通过将元素ID与时间进行关联,并强制要求新元素的ID必须大于旧元素的ID, Redis从逻辑上将流变成了一种只执行追加操作(append only)的数据结构。


这种特性对于使用流实现消息队列和事件系统的用户来说是非常重要的:


用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的。


XREAD:读取消息


云凌老狗使用如下指令接收云山的命令:


XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
   2) 1) 1) "1645936602161-0"
         2) 1) "task"
            2) "kill"
            3) "name"
            4) "萧炎" # 萧炎


XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]


该指令可以同时对多个流进行读取,每个心法对应含义如下:


  • COUNT:表示每个流中最多读取的元素个数;


  • BLOCK:阻塞读取,当消息队列没有消息的时候,则阻塞等待, 0 表示无限等待,单位是毫秒。


  • ID:消息 ID,在读取消息的时候可以指定 ID,并从这个 ID 的下一条消息开始读取,0-0 则表示从第一个元素开始读取


如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息 ID,下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。


云韵宗主,我今天刚到云岚宗,历史的消息就不接了,只想接收我使用 XREAD 阻塞等待的那一刻开始通过 XADD 发布的消息要咋整?


运行「$」心法即可,心法的最后 「$」符号表示读取最新的阻塞消息,读取不到则一直死等。


等待过程中,其他长老向队列追加消息,则会立即读取到。


XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 $


这么容易就实现消息队列了么?说好的 ACK 机制呢?


这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当重新执行 XREAD COUNT 2 BLOCK 0 STREAMS 云岚宗 0-0 指令的时候又会重新读取到。


所以我们还需要 ACK 机制,


接下来,我们来一个真正的消息队列。


ConsumerGroup


Redis Stream 的 ConsumerGroup(消费者组)允许用户将一个流从逻辑上划分为多个不同的流,并让 ConsumerGroup 的消费者去处理。


它是一个强大的支持多播的可持久化的消息队列。 Redis Stream 借鉴了 Kafka 的设计。


Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。


image.png


  • Redis Stream 的结构如上图所示。有一个消息链表,每个消息都有一个唯一的 ID 和对应的内容;


  • 消息持久化;


  • 每个消费组的状态是独立的,不不影响,同一份的 Stream 消息会被所有的消费组消费;


  • 一个消费组可以有多个消费者组成,消费者之间是竞争关系,任意一个消费者读取了消息都会使 last_deliverd_id 往前移动;


  • 每个消费者有一个 pending_ids 变量,用于记录当前消费者读取了但是还没 ack 的消息。它用来保证消息至少被客户端消费了一次。


消费组实现的消息队列主要涉及以下三个指令:


  • XGROUP用于创建、销毁和管理消费者组。


  • XREADGROUP用于通过消费者组从流中读取。


  • XACK是允许消费者将待处理消息标记为已正确处理的命令。


创建消费组


Stream 通过 XGROUP CREATE 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。


我们使用 XADD 往 bossStream 队列插入一些消息:


XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40


如下指令,为消息队列名为 bossStream 创建「青龙门」和「六扇门」两个消费组。


# 语法如下
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream 青龙门 0-0 MKSTREAM
XGROUP CREATE bossStream 六扇门 0-0 MKSTREAM


  • stream:指定队列的名字;


  • group:指定消费组名字;


  • start_id:指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。


  • MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。


读取消息


让「青龙门」消费组的 consumer1bossStream 阻塞读取一条消息:


XREADGROUP GROUP 青龙门 consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
   2) 1) 1) "1645957821396-0"
         2) 1) "name"
            2) "zhangsan"
            3) "age"
            4) "26"


语法如下:


XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]


[] 内的表示可选参数,该命令与 XREAD 大同小异,区别在于新增 GROUP groupName consumerName 选项。


该选项的两个参数分别用于指定被读取的消费者组以及负责处理消息的消费者。

其中:


  • >:命令的最后参数 >,表示从尚未被消费的消息开始读取;


  • BLOCK:阻塞读取;


敲黑板了


如果消息队列中的消息被消费组的一个消费者消费了,这条消息就不会再被这个消费组的其他消费者读取到。


比如 consumer2 执行读取操作:


XREADGROUP GROUP 青龙门 consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
   2) 1) 1) "1645957838700-0"
         2) 1) "name"
            2) "lisi"
            3) "age"
            4) "2"


consumer2 不能再读取到 zhangsan 了,而是读取下一条 lisi 因为这条消息已经被 consumer1 读取了。


使用消费者的另一个目的可以让组内的多个消费者分担读取消息,也就是每个消费者读取部分消息,从而实现均衡负载。


比如一个消费组有三个消费者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的流:


image.png


XPENDING 查看已读未确认消息


为了保证消费者在消费的时候发生故障或者宕机重启后依然可以读取消息,Stream 内部有一个队列(pending List)保存每个消费者读取但是还没有执行 ACK 的消息


如果消费者使用了 XREADGROUP GROUP groupName consumerName 读取消息,但是没有给 Stream 发送 XACK 命令,消息依然保留。


比如查看 bossStream 中的 消费组「青龙门」中各个消费者已读取未确认的消息信息:


XPENDING bossStream 青龙门
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"


  1. 1)未确认消息条数;


  1. 2) ~ 3)青龙门中所有消费者读取的消息最小和最大 ID;


查看 consumer1读取了哪些数据,使用以下命令:


XPENDING bossStream 青龙门 - + 10 consumer1
1) 1) "1645957821396-0"
   2) "consumer1"
   3) (integer) 3758384
   4) (integer) 1


ACK 确认


所以当接收到消息并且消费成功以后,我们需要手动 ACK 通知 Streams,这条消息就会被删除了。命令如下:


XACK bossStream 青龙门 1645957821396-0 1645957838700-0
(integer) 2


语法如下:


XACK key group-key ID [ID ...]


消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:


image.png


使用 Redisson 实战


使用 maven 添加依赖


<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.7</version>
</dependency>


添加 Redis 配置,码哥的 Redis 没有配置密码,大家根据实际情况配置即可。


spring:
  application:
    name: redission
  redis:
    host: 127.0.0.1
    port: 6379
    ssl: false


@Slf4j
@Service
public class QueueService {
    @Autowired
    private RedissonClient redissonClient;
    /**
     * 发送消息到队列
     *
     * @param message
     */
    public void sendMessage(String message) {
        RStream<String, String> stream = redissonClient.getStream("sensor#4921");
        stream.add("speed", "19");
        stream.add("velocity", "39%");
        stream.add("temperature", "10C");
    }
    /**
     * 消费者消费消息
     *
     * @param message
     */
    public void consumerMessage(String message) {
        RStream<String, String> stream = redissonClient.getStream("sensor#4921");
        stream.createGroup("sensors_data", StreamMessageId.ALL);
        Map<StreamMessageId, Map<String, String>> messages = stream.readGroup("sensors_data", "consumer_1");
        for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
          Map<String, String> msg = entry.getValue();
          System.out.println(msg);
          stream.ack("sensors_data", entry.getKey());
        }
    }
}


读者朋友阅读后有收获的话点赞、收藏并分享,感谢支持。利他利己利黎明百姓。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
74 6
|
12天前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
33 6
|
2月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
98 20
剖析 Redis List 消息队列的三种消费线程模型
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
27 2
|
1月前
|
消息中间件 存储 NoSQL
如何用Redis实现延迟队列?
综上所述,通过Redis的有序集合和一些基本命令,我们可以轻松地构建出功能完善的延迟队列系统。根据具体需求,可以进一步优化和扩展,以满足高性能和高可靠性的业务需求。
34 1
|
1月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
37 0
|
5月前
|
安全 Java
java线程之List集合并发安全问题及解决方案
java线程之List集合并发安全问题及解决方案
911 1
|
4月前
|
Java API Apache
怎么在在 Java 中对List进行分区
本文介绍了如何将列表拆分为给定大小的子列表。尽管标准Java集合API未直接支持此功能,但Guava和Apache Commons Collections提供了相关API。
|
4月前
|
运维 关系型数据库 Java
PolarDB产品使用问题之使用List或Range分区表时,Java代码是否需要进行改动
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
4月前
|
存储 安全 Java
详解Java中集合的List接口实现的ArrayList方法 | Set接口实现的HashSet方法
详解Java中集合的List接口实现的ArrayList方法 | Set接口实现的HashSet方法