深入学习Redis 消息队列 Stream

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Stream 是 Redis 5.0 版本中新增的一种数据结构,它是一个高性能、持久化的消息队列,可以用于实现消息的发布和订阅。Stream 可以看作是一个有序的消息队列,每个消息都有一个唯一的 ID,可以根据 ID 进行消息的查找、删除和确认。在 Stream 中,消息以键值对的形式存储,可以存储任意类型的数据。Stream 还支持多个消费者组,每个消费者组可以独立消费消息,避免消息重复消费。Stream 的引入使得 Redis 在消息队列领域更具竞争力,同时也为开发者提供了一种高效、可靠的消息处理方式

博主介绍: ✌博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家✌

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

1686811917950.jpg

1、什么是 Stream

Stream 是 Redis 5.0 版本中新增的一种数据结构,它是一个高性能、持久化的消息队列,可以用于实现消息的发布和订阅。Stream 可以看作是一个有序的消息队列,每个消息都有一个唯一的 ID,可以根据 ID 进行消息的查找、删除和确认。在 Stream 中,消息以键值对的形式存储,可以存储任意类型的数据。Stream 还支持多个消费者组,每个消费者组可以独立消费消息,避免消息重复消费。Stream 的引入使得 Redis 在消息队列领域更具竞争力,同时也为开发者提供了一种高效、可靠的消息处理方式。

2、为什么要设计 Stream

Redis 设计 Stream 的原因主要是为了满足大规模实时数据处理的需求。在传统的消息队列中,消息的消费者只能消费最新的消息,而无法消费过去的消息。而在实时数据处理中,往往需要对历史数据进行分析和处理,因此需要一种能够存储大量历史数据并支持快速查询和消费的数据结构。Stream 的引入解决了这个问题,它支持持久化存储和快速查询,可以存储大量历史数据,并且支持多个消费者组独立消费消息,从而满足了大规模实时数据处理的需求。此外,Stream 还支持消息的延迟和重试等功能,使得 Redis 在消息队列领域更具竞争力。

3、Stream 命令详解

Stream 是 Redis 5.0 版本新增的一种数据结构,支持高性能、持久化的消息队列。下面是 Stream 命令的详细介绍:

  1. XADD key ID field1 value1 [field2 value2 ...]:向指定的 Stream 中添加一条消息,消息的 ID 由用户指定,消息的字段和值由用户指定。
127.0.0.1:6379> XADD mystream 1000 name John age 30
"1000-0"
  1. XRANGE key start end [COUNT count]:返回指定 Stream 中指定范围内的消息,范围由 start 和 end 指定,可以使用 "-" 表示最大或最小 ID,COUNT 参数表示返回消息的数量。
127.0.0.1:6379> XRANGE mystream 1000-0 1000-1
1) 1) "1000-0"
      2) 1) "name"
         2) "John"
         3) "age"
         4) "30"
  1. XREVRANGE key end start [COUNT count]:同 XRANGE 命令,但是返回的消息是逆序的。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREVRANGE mystream 1003-0 1001-0
1) 1) "1003-0"
      2) 1) "name"
         2) "Jack"
         3) "age"
         4) "30"
   2) 1) "1002-0"
      2) 1) "name"
         2) "Mary"
         3) "age"
         4) "28"
   3) 1) "1001-0"
      2) 1) "name"
         2) "Tom"
         3) "age"
         4) "25"
  1. XLEN key:返回指定 Stream 中消息的数量。
127.0.0.1:6379> XLEN mystream
1
  1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定 Stream 中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREAD COUNT 2 BLOCK 1000 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
      2) 1) "1002-0"
         2) 1) "name"
            2) "Mary"
            3) "age"
            4) "28"
  1. XACK key group ID [ID ...]:确认指定消费者组已经处理了指定 ID 的消息。
127.0.0.1:6379> XACK mystream mygroup 1000-0
(integer) 1
  1. XGROUP CREATE key groupname ID [MKSTREAM]:创建一个新的消费者组,可以指定组名和起始 ID,如果指定 MKSTREAM 参数,则会自动创建 Stream。
127.0.0.1:6379> XGROUP CREATE mystream mygroup 1000-0
OK
  1. XGROUP SETID key groupname ID:设置消费者组的起始 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP SETID mystream mygroup 1002-0
OK
  1. XGROUP DESTROY key groupname:销毁指定的消费者组。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP DESTROY mystream mygroup
(integer) 1
  1. XGROUP DELCONSUMER key groupname consumername:从指定消费者组中删除指定的消费者。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
  1. XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定消费者组中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1000-0"
         2) 1) "name"
            2) "John"
            3) "age"
            4) "30"
  1. XCLAIM key groupname consumername min-idle-time ID [ID ...] [IDLE milliseconds] [TIME milliseconds] [RETRYCOUNT count] [FORCE]:从指定消费者组中获取一条未被确认的消息,并将其标记为正在处理,可以指定最小空闲时间、最大空闲时间、重试次数等参数。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XCLAIM mystream mygroup myconsumer 0 1001-0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
  1. XDEL key ID [ID ...]:从指定 Stream 中删除指定 ID 的消息。
127.0.0.1:6379> XDEL mystream 1000-0
(integer) 1
  1. XTRIM key MAXLEN [~] count:删除 Stream 中多余的消息,可以指定删除的数量或者删除到指定的 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XADD mystream 1004 name Lucy age 27
"1004-0"
127.0.0.1:6379> XADD mystream 1005 name Bob age 32
"1005-0"

127.0.0.1:6379> XTRIM mystream MAXLEN 3
(integer) 2

4、java 写一点 Stream 的 demo

我这边 redis 的版本是 Redis-x64-5.0.14.1,windows 上玩的,绿色版的,下载地址https://github.com/tporadowski/redis/releases

image.png

一下是 demo 的代码:

package com.pany.camp.redis;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamConsumersInfo;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;

import java.util.*;

public class RedisStreamDemo {
   
   
    private static final Logger logger = LoggerFactory.getLogger(RedisStreamDemo.class);
    private static final int MESSAGE_READ_COUNT = 1;
    private static final long MESSAGE_READ_TIMEOUT = 120000L;

    public static void main(String[] args) {
   
   
        // 创建 Jedis 实例
        try (Jedis jedis = new Jedis("127.0.0.1", 6379)) {
   
   
            // 定义 Stream 名称和消费者组名称
            String streamName = "mystream";
            String groupName = "mygroup5";
            // 创建消费者组
            try {
   
   
                jedis.xgroupCreate(streamName, groupName, new StreamEntryID(), true);
            } catch (JedisDataException e) {
   
   
                // 如果 Stream 已经存在,则忽略异常
                if (!e.getMessage().contains("BUSYGROUP")) {
   
   
                    throw e;
                }
            }
            logger.info("消费者组已创建");
            // 添加消息到 Stream 中
            Map<String, String> fields = new HashMap<>();
            fields.put("field1", "value1");
            fields.put("field2", "value2");
            StreamEntryID messageId = jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, fields);
            logger.info("消息已添加到 Stream 中,消息内容为:{}", JSONObject.toJSONString(fields));
            // 读取消息
            Map.Entry<String, StreamEntryID> streams = new AbstractMap.SimpleImmutableEntry<>(streamName,
                    new StreamEntryID().UNRECEIVED_ENTRY);
            List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(groupName, "c1", MESSAGE_READ_COUNT,
                    MESSAGE_READ_TIMEOUT, true, streams);
            logger.info("从 Stream 中读取了 {} 条消息", messages.size());
            for (Map.Entry<String, List<StreamEntry>> entry : messages) {
   
   
                String sn = entry.getKey();
                List<StreamEntry> streamMessages = entry.getValue();
                for (StreamEntry message : streamMessages) {
   
   
                    logger.info("Stream 名称:{}", sn);
                    logger.info("Message ID:{}", message.getID());
                    logger.info("Message fields:{}", message.getFields());
                }
            }
            // 确认消息已经被消费
            jedis.xack(streamName, groupName, messageId);
            logger.info("消息已确认消费");
            // 删除消息
            jedis.xdel(streamName, messageId);
            logger.info("消息已删除");
        } catch (Exception e) {
   
   
            logger.error("执行 Redis 操作出错", e);
        }
    }
}
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.7.0</version>
</dependency>

image.png

5、Stream 的应用场景

Redis Stream 的常用的应用场景:

  1. 消息队列:Stream 可以作为一个高性能的消息队列使用,支持多个消费者对同一 Stream 进行消费,且支持消费者组的管理、消息确认和消息持久化等功能。

  2. 日志收集:Stream 可以作为一个分布式的日志收集系统使用,支持多个客户端将日志写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。

  3. 实时数据处理:Stream 可以作为一个实时数据处理系统使用,支持多个客户端将实时数据写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。

  4. 事件驱动架构:Stream 可以作为一个事件驱动架构的基础设施使用,支持多个事件源将事件写入到同一 Stream 中,且支持按照事件类型和时间戳进行查询和过滤。

1686494501743.jpg

💕💕 本文由激流丶原创,原创不易,感谢支持!
💕💕喜欢的话记得点赞收藏啊!

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
消息中间件 存储 负载均衡
Redis使用ZSET实现消息队列使用总结二
Redis使用ZSET实现消息队列使用总结二
61 0
|
29天前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
73 20
剖析 Redis List 消息队列的三种消费线程模型
|
9天前
|
消息中间件 NoSQL 中间件
19)消息队列的终极解决方案 Stream
19)消息队列的终极解决方案 Stream
22 0
|
2月前
|
缓存 NoSQL 关系型数据库
Redis学习总结
Redis学习总结
33 1
|
2月前
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
49 0
|
2月前
|
消息中间件
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
53 0
|
3月前
|
消息中间件 存储 NoSQL
Redis使用ZSET实现消息队列使用总结一
Redis使用ZSET实现消息队列使用总结一
72 0
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
25天前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
25 0
手撸MQ消息队列——循环数组