Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
日志服务 SLS,月写入数据量 50GB 1个月
简介: Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】

欢迎来到我的博客,代码的世界里,每一行都是一个故事


前言

在快节奏的技术世界中,消息队列是连接不同服务和组件的关键。而在这个领域,Redis Streams作为一种新兴的消息队列解决方案,以其高性能和易用性吸引了众多开发者的目光。当这项技术遇到了Spring Boot —— 当今最受欢迎的Java开发框架,它们的结合将如何开启新的可能性?让我们开始这趟探索之旅,深入了解如何将这两种强大的技术融合在一起,打造出优雅而强大的消息队列系统。

Redis Streams的基本概念和特性

Redis Streams是Redis数据库的一个强大类型,于Redis 5.0中引入。它主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。以下是Redis Streams的一些基本概念和核心特性:

1. 日志数据结构

  • 持久化的消息日志:Redis Streams是一个按时间排序的消息日志。每条消息都存储在它被插入时的顺序位置,并且有一个唯一的ID标识。
  • 可追溯性:由于其日志特性,Redis Streams允许你访问历史消息,这对于消息的追溯、重放和延迟处理非常有用。

2. 消息和字段

  • 消息结构:每条消息都可以包含一个或多个字段(field)和值(value)。这类似于一个小的哈希结构,使得每条消息可以携带多个相关的数据点。
  • 灵活的数据模型:你可以根据应用的需要自由定义每条消息包含的字段和数据格式。

3. 消费者组

  • 支持多消费者:Redis Streams可以被多个消费者或多个消费者组同时读取,每个消费者组都会跟踪其成员的进度。
  • 消息确认:消费者读取并处理消息后,可以发送确认,表示消息已被处理。未确认的消息可以被再次处理,确保消息不会因消费者失败而丢失。
  • 故障处理:支持挂起的消息列表和消费者超时检测,使得在消费者失败时可以由其他消费者接手处理消息。

4. 消息ID

  • 自动生成或指定:消息ID通常由Redis自动生成,保证了全局唯一性和顺序性。你也可以手动指定ID以实现更复杂的场景。
  • 组成结构:ID由一个时间戳部分和一个序列号部分组成,格式为<时间戳>-<序列号>

5. 实时和历史数据处理

  • 实时消息处理:通过XREADXREADGROUP命令,你可以实时监听并处理新添加到流中的消息。
  • 历史消息查询:通过XRANGEXREVRANGE等命令,可以查询流中的历史消息,这对于数据分析、审计和消息重放非常有用。

6. 性能和可靠性

  • 高性能:Redis Streams设计用于处理高吞吐量的消息,能够支持每秒数百万消息的读写。
  • 持久化:与Redis的其他数据类型一样,Streams的数据可以持久化到磁盘,保证了数据的持久性和可靠性。

实战

maven依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

配置StreamConfig(监听)

package fun.bo.config;
import fun.bo.consumer.MessageConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import java.time.Duration;
/**
 * @author xiaobo
 */
@Configuration
public class StreamConfig {
    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(
            RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {
        // 用于配置消息监听容器的选项。在这个方法中,通过设置不同的选项,如轮询超时时间和消息的目标类型,可以对消息监听容器进行个性化的配置。
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 设置了轮询超时的时间为100毫秒。这意味着当没有新的消息时,容器将每隔100毫秒进行一次轮询。
                        .pollTimeout(Duration.ofMillis(100))
                        // 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型,以便在后续的处理中使用。
                        .targetType(String.class)
                        .build();
        // 创建一个可用于监听Redis流的消息监听容器。
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer =
                StreamMessageListenerContainer.create(connectionFactory, options);
        // 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量,以确定从哪里开始读取消息。
        listenerContainer.receive(
                Consumer.from("your-consumer-group", "your-consumer-name"),
                StreamOffset.create("your-stream-name", ReadOffset.lastConsumed()), messageConsumer);
        // 方法启动了消息监听容器,使其开始监听消息。一旦容器被启动,它将开始接收并处理来自Redis流的消息。
        listenerContainer.start();
        return listenerContainer;
    }
}

配置生产者

package fun.bo.produce;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
 * @author xiaobo
 */
@Service
@RequiredArgsConstructor
public class MessageProducer {
    private final RedisTemplate<String, String> redisTemplate;
    public void sendMessage(String streamKey, String messageKey, String message) {
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put(messageKey, message);
        RecordId recordId = redisTemplate.opsForStream().add(streamKey, messageMap);
        if (recordId != null) {
            System.out.println("Message sent to Stream '" + streamKey + "' with RecordId: " + recordId);
        }
    }
}

配置消费者(组)

package fun.bo.consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;
/**
 * @author xiaobo
 */
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {
    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        String stream = message.getStream();
        String messageId = message.getId().toString();
        String messageBody = message.getValue();
        System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);
        System.out.println("Message body: " + messageBody);
    }
}

配置初始化方法

如果是已经存在stream,则可以不配置,这个主要是为了防止启动报错,org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key ‘your-stream-name’ or consumer group ‘your-consumer-group’ in XREADGROUP with GROUP option

public void initializeStream() {
  StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
  // 创建一个流
  try {
    streamOperations.createGroup("your-stream-name", ReadOffset.from("0"), "your-consumer-group");
  } catch (Exception e) {
    // 流可能已存在,忽略异常
  }
}

实现效果

基于List和专业消息队列对比

Redis Streams作为消息队列相比于使用传统的Redis List类型,引入了一系列改进和新功能,同时也与专业的高级消息队列系统(如RabbitMQ、Kafka等)相比存在一些差距。以下是详细的分析:

相比于Redis List解决的痛点:

  1. 更好的消息顺序保证
  • List:虽然List可以保持插入顺序,但在高并发情况下,确保生产者和消费者的顺序一致性较为复杂。
  • Streams:提供了全局唯一的、基于时间的ID来标识消息,确保了消息的全局顺序。
  1. 消费者组支持
  • List:原生List类型不支持消费者组的概念,实现多消费者协调处理同一任务队列较为复杂。
  • Streams:原生支持消费者组,允许多个消费者共享负载,并跟踪各自的进度。
  1. 消息持久化和读取
  • List:读取或消费消息后,需要显式删除,否则会一直保留在List中,处理大量消息时可能会导致内存问题。
  • Streams:消息即使被消费,仍然保留在Stream中,可以随时查询历史消息,且不会因消费而被移除。
  1. 复杂的读取操作
  • List:List提供的操作相对简单,复杂的读取逻辑(如按时间范围查询)需要额外的逻辑来实现。
  • Streams:提供了复杂的查询命令,如XRANGEXREVRANGE,可以按ID范围(时间范围)查询消息。
  1. 消息确认和重试
  • List:需要手动实现消息确认和重试机制,管理起来较为复杂。
  • Streams:提供了消息确认(XACK)和挂起消息查询(XPENDING)的功能,使得消息的重试和故障处理更加容易。

相比于专业高级队列的不足:

  1. 事务和消息持久性保证
  • Redis Streams:虽然提供持久化,但在处理复杂事务和确保消息持久性方面不如一些专业的消息队列系统(如Kafka的WAL日志)。
  1. 集群和分区
  • Redis Streams:在集群环境下使用稍显复杂,且对于数据分区和扩展性的支持不如专业的消息队列系统(如Kafka的分区机制)。
  1. 管理和监控工具
  • Redis Streams:虽然有基本的监控命令,但没有一些高级消息队列系统提供的丰富的管理界面和监控工具。
  1. 高级消息路由和过滤
  • Redis Streams:缺乏一些高级消息队列提供的消息路由和过滤功能(如RabbitMQ的Exchange和Binding)。
  1. 消息传递语义
  • Redis Streams:提供了基础的至少一次处理语义,但可能不像某些系统那样支持严格的只处理一次语义。

总结

Redis Streams提供了一个轻量级、高性能且功能丰富的消息队列实现,解决了使用List作为队列时的许多痛点,特别适合需要快速部署、低延迟和简单可靠的场景。然而,对于需要复杂事务处理、高级路由和过滤、或更丰富管理工具的复杂应用场景,专业的消息队列系统可能更加适合。选择哪种方案,应根据你的具体需求、资源和技术栈来决定。

相关文章
|
2月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
371 3
|
19天前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
2月前
|
人工智能 监控 安全
如何快速上手【Spring AOP】?核心应用实战(上篇)
哈喽大家好吖~欢迎来到Spring AOP系列教程的上篇 - 应用篇。在本篇,我们将专注于Spring AOP的实际应用,通过具体的代码示例和场景分析,帮助大家掌握AOP的使用方法和技巧。而在后续的下篇中,我们将深入探讨Spring AOP的实现原理和底层机制。 AOP(Aspect-Oriented Programming,面向切面编程)是Spring框架中的核心特性之一,它能够帮助我们解决横切关注点(如日志记录、性能统计、安全控制、事务管理等)的问题,提高代码的模块化程度和复用性。
|
3月前
|
NoSQL Java Redis
Redis基本数据类型及Spring Data Redis应用
Redis 是开源高性能键值对数据库,支持 String、Hash、List、Set、Sorted Set 等数据结构,适用于缓存、消息队列、排行榜等场景。具备高性能、原子操作及丰富功能,是分布式系统核心组件。
384 2
|
4月前
|
Cloud Native Java 微服务
Spring Boot 3.x 现代化应用开发实战技巧与最佳实践
本指南基于Spring Boot 3.x,融合微服务、云原生与响应式编程等前沿技术,打造现代化应用开发实践。通过构建智能电商平台案例,涵盖商品、订单、用户等核心服务,展示Spring WebFlux、OAuth 2.0认证、Spring Cloud Gateway路由、GraalVM原生编译等技术实现。同时提供Docker/Kubernetes部署方案及性能优化策略,助您掌握从开发到生产的全流程。代码示例详实,适合进阶开发者参考。
364 2
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
11月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
171 0
手撸MQ消息队列——循环数组