深入学习Redis 消息队列 Stream

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Stream 是 Redis 5.0 版本中新增的一种数据结构,它是一个高性能、持久化的消息队列,可以用于实现消息的发布和订阅。Stream 可以看作是一个有序的消息队列,每个消息都有一个唯一的 ID,可以根据 ID 进行消息的查找、删除和确认。在 Stream 中,消息以键值对的形式存储,可以存储任意类型的数据。Stream 还支持多个消费者组,每个消费者组可以独立消费消息,避免消息重复消费。Stream 的引入使得 Redis 在消息队列领域更具竞争力,同时也为开发者提供了一种高效、可靠的消息处理方式

博主介绍: ✌博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家✌

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

1686811917950.jpg

1、什么是 Stream

Stream 是 Redis 5.0 版本中新增的一种数据结构,它是一个高性能、持久化的消息队列,可以用于实现消息的发布和订阅。Stream 可以看作是一个有序的消息队列,每个消息都有一个唯一的 ID,可以根据 ID 进行消息的查找、删除和确认。在 Stream 中,消息以键值对的形式存储,可以存储任意类型的数据。Stream 还支持多个消费者组,每个消费者组可以独立消费消息,避免消息重复消费。Stream 的引入使得 Redis 在消息队列领域更具竞争力,同时也为开发者提供了一种高效、可靠的消息处理方式。

2、为什么要设计 Stream

Redis 设计 Stream 的原因主要是为了满足大规模实时数据处理的需求。在传统的消息队列中,消息的消费者只能消费最新的消息,而无法消费过去的消息。而在实时数据处理中,往往需要对历史数据进行分析和处理,因此需要一种能够存储大量历史数据并支持快速查询和消费的数据结构。Stream 的引入解决了这个问题,它支持持久化存储和快速查询,可以存储大量历史数据,并且支持多个消费者组独立消费消息,从而满足了大规模实时数据处理的需求。此外,Stream 还支持消息的延迟和重试等功能,使得 Redis 在消息队列领域更具竞争力。

3、Stream 命令详解

Stream 是 Redis 5.0 版本新增的一种数据结构,支持高性能、持久化的消息队列。下面是 Stream 命令的详细介绍:

  1. XADD key ID field1 value1 [field2 value2 ...]:向指定的 Stream 中添加一条消息,消息的 ID 由用户指定,消息的字段和值由用户指定。
127.0.0.1:6379> XADD mystream 1000 name John age 30
"1000-0"
  1. XRANGE key start end [COUNT count]:返回指定 Stream 中指定范围内的消息,范围由 start 和 end 指定,可以使用 "-" 表示最大或最小 ID,COUNT 参数表示返回消息的数量。
127.0.0.1:6379> XRANGE mystream 1000-0 1000-1
1) 1) "1000-0"
      2) 1) "name"
         2) "John"
         3) "age"
         4) "30"
  1. XREVRANGE key end start [COUNT count]:同 XRANGE 命令,但是返回的消息是逆序的。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREVRANGE mystream 1003-0 1001-0
1) 1) "1003-0"
      2) 1) "name"
         2) "Jack"
         3) "age"
         4) "30"
   2) 1) "1002-0"
      2) 1) "name"
         2) "Mary"
         3) "age"
         4) "28"
   3) 1) "1001-0"
      2) 1) "name"
         2) "Tom"
         3) "age"
         4) "25"
  1. XLEN key:返回指定 Stream 中消息的数量。
127.0.0.1:6379> XLEN mystream
1
  1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定 Stream 中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREAD COUNT 2 BLOCK 1000 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
      2) 1) "1002-0"
         2) 1) "name"
            2) "Mary"
            3) "age"
            4) "28"
  1. XACK key group ID [ID ...]:确认指定消费者组已经处理了指定 ID 的消息。
127.0.0.1:6379> XACK mystream mygroup 1000-0
(integer) 1
  1. XGROUP CREATE key groupname ID [MKSTREAM]:创建一个新的消费者组,可以指定组名和起始 ID,如果指定 MKSTREAM 参数,则会自动创建 Stream。
127.0.0.1:6379> XGROUP CREATE mystream mygroup 1000-0
OK
  1. XGROUP SETID key groupname ID:设置消费者组的起始 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP SETID mystream mygroup 1002-0
OK
  1. XGROUP DESTROY key groupname:销毁指定的消费者组。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP DESTROY mystream mygroup
(integer) 1
  1. XGROUP DELCONSUMER key groupname consumername:从指定消费者组中删除指定的消费者。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
  1. XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定消费者组中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1000-0"
         2) 1) "name"
            2) "John"
            3) "age"
            4) "30"
  1. XCLAIM key groupname consumername min-idle-time ID [ID ...] [IDLE milliseconds] [TIME milliseconds] [RETRYCOUNT count] [FORCE]:从指定消费者组中获取一条未被确认的消息,并将其标记为正在处理,可以指定最小空闲时间、最大空闲时间、重试次数等参数。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XCLAIM mystream mygroup myconsumer 0 1001-0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
  1. XDEL key ID [ID ...]:从指定 Stream 中删除指定 ID 的消息。
127.0.0.1:6379> XDEL mystream 1000-0
(integer) 1
  1. XTRIM key MAXLEN [~] count:删除 Stream 中多余的消息,可以指定删除的数量或者删除到指定的 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XADD mystream 1004 name Lucy age 27
"1004-0"
127.0.0.1:6379> XADD mystream 1005 name Bob age 32
"1005-0"

127.0.0.1:6379> XTRIM mystream MAXLEN 3
(integer) 2

4、java 写一点 Stream 的 demo

我这边 redis 的版本是 Redis-x64-5.0.14.1,windows 上玩的,绿色版的,下载地址https://github.com/tporadowski/redis/releases

image.png

一下是 demo 的代码:

package com.pany.camp.redis;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamConsumersInfo;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;

import java.util.*;

public class RedisStreamDemo {
   
   
    private static final Logger logger = LoggerFactory.getLogger(RedisStreamDemo.class);
    private static final int MESSAGE_READ_COUNT = 1;
    private static final long MESSAGE_READ_TIMEOUT = 120000L;

    public static void main(String[] args) {
   
   
        // 创建 Jedis 实例
        try (Jedis jedis = new Jedis("127.0.0.1", 6379)) {
   
   
            // 定义 Stream 名称和消费者组名称
            String streamName = "mystream";
            String groupName = "mygroup5";
            // 创建消费者组
            try {
   
   
                jedis.xgroupCreate(streamName, groupName, new StreamEntryID(), true);
            } catch (JedisDataException e) {
   
   
                // 如果 Stream 已经存在,则忽略异常
                if (!e.getMessage().contains("BUSYGROUP")) {
   
   
                    throw e;
                }
            }
            logger.info("消费者组已创建");
            // 添加消息到 Stream 中
            Map<String, String> fields = new HashMap<>();
            fields.put("field1", "value1");
            fields.put("field2", "value2");
            StreamEntryID messageId = jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, fields);
            logger.info("消息已添加到 Stream 中,消息内容为:{}", JSONObject.toJSONString(fields));
            // 读取消息
            Map.Entry<String, StreamEntryID> streams = new AbstractMap.SimpleImmutableEntry<>(streamName,
                    new StreamEntryID().UNRECEIVED_ENTRY);
            List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(groupName, "c1", MESSAGE_READ_COUNT,
                    MESSAGE_READ_TIMEOUT, true, streams);
            logger.info("从 Stream 中读取了 {} 条消息", messages.size());
            for (Map.Entry<String, List<StreamEntry>> entry : messages) {
   
   
                String sn = entry.getKey();
                List<StreamEntry> streamMessages = entry.getValue();
                for (StreamEntry message : streamMessages) {
   
   
                    logger.info("Stream 名称:{}", sn);
                    logger.info("Message ID:{}", message.getID());
                    logger.info("Message fields:{}", message.getFields());
                }
            }
            // 确认消息已经被消费
            jedis.xack(streamName, groupName, messageId);
            logger.info("消息已确认消费");
            // 删除消息
            jedis.xdel(streamName, messageId);
            logger.info("消息已删除");
        } catch (Exception e) {
   
   
            logger.error("执行 Redis 操作出错", e);
        }
    }
}
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.7.0</version>
</dependency>

image.png

5、Stream 的应用场景

Redis Stream 的常用的应用场景:

  1. 消息队列:Stream 可以作为一个高性能的消息队列使用,支持多个消费者对同一 Stream 进行消费,且支持消费者组的管理、消息确认和消息持久化等功能。

  2. 日志收集:Stream 可以作为一个分布式的日志收集系统使用,支持多个客户端将日志写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。

  3. 实时数据处理:Stream 可以作为一个实时数据处理系统使用,支持多个客户端将实时数据写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。

  4. 事件驱动架构:Stream 可以作为一个事件驱动架构的基础设施使用,支持多个事件源将事件写入到同一 Stream 中,且支持按照事件类型和时间戳进行查询和过滤。

1686494501743.jpg

💕💕 本文由激流丶原创,原创不易,感谢支持!
💕💕喜欢的话记得点赞收藏啊!

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8天前
|
消息中间件 NoSQL Java
springboot redis 实现消息队列
springboot redis 实现消息队列
71 1
|
7天前
|
NoSQL 算法 Java
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
|
8天前
|
消息中间件 存储 负载均衡
Redis类型 Stream Bitfield
Redis类型 Stream Bitfield
15 0
|
8天前
|
消息中间件 SQL
7、stream消息队列
7、stream消息队列
|
8天前
|
消息中间件 缓存 NoSQL
Redis stream 用做消息队列完美吗
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。 这篇文章,我们聊聊 Redis Stream 基本用法 ,以及如何在 SpringBoot 项目中应用 Redis Stream 。
Redis stream 用做消息队列完美吗
|
8天前
|
消息中间件 监控 NoSQL
使用redis做消息队列
使用redis做消息队列
38 0
|
8天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
21 0
|
8天前
|
消息中间件 存储 NoSQL
Redis Stream: 实时消息处理的利器,让你的数据流畅又可靠!
Redis Stream: 实时消息处理的利器,让你的数据流畅又可靠!
|
8天前
|
消息中间件 存储 负载均衡
消息队列学习之kafka
【4月更文挑战第2天】消息队列学习之kafka,一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台。
24 2
|
消息中间件 NoSQL Redis
Redis Stream简介
## 背景 Redis Stream是一个作者已经谋划多年的feature,本质是一个消息队列,但是和kafka、RocketMq等消息中间件相比也有其独特之处。Redis Stream本来是计划放在4.0这个大版本中发布(原计划4.2),但是由于确实是个比较重磅的feature,对内核的改动也比较大,目前已经提升到Redis 5.0发布,根据作者Twitter的消息,不出意外,18年上半年
3381 0