Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
日志服务 SLS,月写入数据量 50GB 1个月
简介: Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】

欢迎来到我的博客,代码的世界里,每一行都是一个故事


前言

在快节奏的技术世界中,消息队列是连接不同服务和组件的关键。而在这个领域,Redis Streams作为一种新兴的消息队列解决方案,以其高性能和易用性吸引了众多开发者的目光。当这项技术遇到了Spring Boot —— 当今最受欢迎的Java开发框架,它们的结合将如何开启新的可能性?让我们开始这趟探索之旅,深入了解如何将这两种强大的技术融合在一起,打造出优雅而强大的消息队列系统。

Redis Streams的基本概念和特性

Redis Streams是Redis数据库的一个强大类型,于Redis 5.0中引入。它主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。以下是Redis Streams的一些基本概念和核心特性:

1. 日志数据结构

  • 持久化的消息日志:Redis Streams是一个按时间排序的消息日志。每条消息都存储在它被插入时的顺序位置,并且有一个唯一的ID标识。
  • 可追溯性:由于其日志特性,Redis Streams允许你访问历史消息,这对于消息的追溯、重放和延迟处理非常有用。

2. 消息和字段

  • 消息结构:每条消息都可以包含一个或多个字段(field)和值(value)。这类似于一个小的哈希结构,使得每条消息可以携带多个相关的数据点。
  • 灵活的数据模型:你可以根据应用的需要自由定义每条消息包含的字段和数据格式。

3. 消费者组

  • 支持多消费者:Redis Streams可以被多个消费者或多个消费者组同时读取,每个消费者组都会跟踪其成员的进度。
  • 消息确认:消费者读取并处理消息后,可以发送确认,表示消息已被处理。未确认的消息可以被再次处理,确保消息不会因消费者失败而丢失。
  • 故障处理:支持挂起的消息列表和消费者超时检测,使得在消费者失败时可以由其他消费者接手处理消息。

4. 消息ID

  • 自动生成或指定:消息ID通常由Redis自动生成,保证了全局唯一性和顺序性。你也可以手动指定ID以实现更复杂的场景。
  • 组成结构:ID由一个时间戳部分和一个序列号部分组成,格式为<时间戳>-<序列号>

5. 实时和历史数据处理

  • 实时消息处理:通过XREADXREADGROUP命令,你可以实时监听并处理新添加到流中的消息。
  • 历史消息查询:通过XRANGEXREVRANGE等命令,可以查询流中的历史消息,这对于数据分析、审计和消息重放非常有用。

6. 性能和可靠性

  • 高性能:Redis Streams设计用于处理高吞吐量的消息,能够支持每秒数百万消息的读写。
  • 持久化:与Redis的其他数据类型一样,Streams的数据可以持久化到磁盘,保证了数据的持久性和可靠性。

实战

maven依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

配置StreamConfig(监听)

package fun.bo.config;
import fun.bo.consumer.MessageConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import java.time.Duration;
/**
 * @author xiaobo
 */
@Configuration
public class StreamConfig {
    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(
            RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {
        // 用于配置消息监听容器的选项。在这个方法中,通过设置不同的选项,如轮询超时时间和消息的目标类型,可以对消息监听容器进行个性化的配置。
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 设置了轮询超时的时间为100毫秒。这意味着当没有新的消息时,容器将每隔100毫秒进行一次轮询。
                        .pollTimeout(Duration.ofMillis(100))
                        // 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型,以便在后续的处理中使用。
                        .targetType(String.class)
                        .build();
        // 创建一个可用于监听Redis流的消息监听容器。
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer =
                StreamMessageListenerContainer.create(connectionFactory, options);
        // 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量,以确定从哪里开始读取消息。
        listenerContainer.receive(
                Consumer.from("your-consumer-group", "your-consumer-name"),
                StreamOffset.create("your-stream-name", ReadOffset.lastConsumed()), messageConsumer);
        // 方法启动了消息监听容器,使其开始监听消息。一旦容器被启动,它将开始接收并处理来自Redis流的消息。
        listenerContainer.start();
        return listenerContainer;
    }
}

配置生产者

package fun.bo.produce;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
 * @author xiaobo
 */
@Service
@RequiredArgsConstructor
public class MessageProducer {
    private final RedisTemplate<String, String> redisTemplate;
    public void sendMessage(String streamKey, String messageKey, String message) {
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put(messageKey, message);
        RecordId recordId = redisTemplate.opsForStream().add(streamKey, messageMap);
        if (recordId != null) {
            System.out.println("Message sent to Stream '" + streamKey + "' with RecordId: " + recordId);
        }
    }
}

配置消费者(组)

package fun.bo.consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;
/**
 * @author xiaobo
 */
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {
    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        String stream = message.getStream();
        String messageId = message.getId().toString();
        String messageBody = message.getValue();
        System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);
        System.out.println("Message body: " + messageBody);
    }
}

配置初始化方法

如果是已经存在stream,则可以不配置,这个主要是为了防止启动报错,org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key ‘your-stream-name’ or consumer group ‘your-consumer-group’ in XREADGROUP with GROUP option

public void initializeStream() {
  StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
  // 创建一个流
  try {
    streamOperations.createGroup("your-stream-name", ReadOffset.from("0"), "your-consumer-group");
  } catch (Exception e) {
    // 流可能已存在,忽略异常
  }
}

实现效果

基于List和专业消息队列对比

Redis Streams作为消息队列相比于使用传统的Redis List类型,引入了一系列改进和新功能,同时也与专业的高级消息队列系统(如RabbitMQ、Kafka等)相比存在一些差距。以下是详细的分析:

相比于Redis List解决的痛点:

  1. 更好的消息顺序保证
  • List:虽然List可以保持插入顺序,但在高并发情况下,确保生产者和消费者的顺序一致性较为复杂。
  • Streams:提供了全局唯一的、基于时间的ID来标识消息,确保了消息的全局顺序。
  1. 消费者组支持
  • List:原生List类型不支持消费者组的概念,实现多消费者协调处理同一任务队列较为复杂。
  • Streams:原生支持消费者组,允许多个消费者共享负载,并跟踪各自的进度。
  1. 消息持久化和读取
  • List:读取或消费消息后,需要显式删除,否则会一直保留在List中,处理大量消息时可能会导致内存问题。
  • Streams:消息即使被消费,仍然保留在Stream中,可以随时查询历史消息,且不会因消费而被移除。
  1. 复杂的读取操作
  • List:List提供的操作相对简单,复杂的读取逻辑(如按时间范围查询)需要额外的逻辑来实现。
  • Streams:提供了复杂的查询命令,如XRANGEXREVRANGE,可以按ID范围(时间范围)查询消息。
  1. 消息确认和重试
  • List:需要手动实现消息确认和重试机制,管理起来较为复杂。
  • Streams:提供了消息确认(XACK)和挂起消息查询(XPENDING)的功能,使得消息的重试和故障处理更加容易。

相比于专业高级队列的不足:

  1. 事务和消息持久性保证
  • Redis Streams:虽然提供持久化,但在处理复杂事务和确保消息持久性方面不如一些专业的消息队列系统(如Kafka的WAL日志)。
  1. 集群和分区
  • Redis Streams:在集群环境下使用稍显复杂,且对于数据分区和扩展性的支持不如专业的消息队列系统(如Kafka的分区机制)。
  1. 管理和监控工具
  • Redis Streams:虽然有基本的监控命令,但没有一些高级消息队列系统提供的丰富的管理界面和监控工具。
  1. 高级消息路由和过滤
  • Redis Streams:缺乏一些高级消息队列提供的消息路由和过滤功能(如RabbitMQ的Exchange和Binding)。
  1. 消息传递语义
  • Redis Streams:提供了基础的至少一次处理语义,但可能不像某些系统那样支持严格的只处理一次语义。

总结

Redis Streams提供了一个轻量级、高性能且功能丰富的消息队列实现,解决了使用List作为队列时的许多痛点,特别适合需要快速部署、低延迟和简单可靠的场景。然而,对于需要复杂事务处理、高级路由和过滤、或更丰富管理工具的复杂应用场景,专业的消息队列系统可能更加适合。选择哪种方案,应根据你的具体需求、资源和技术栈来决定。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
NoSQL 安全 测试技术
Redis游戏积分排行榜项目中通义灵码的应用实战
Redis游戏积分排行榜项目中通义灵码的应用实战
52 4
|
20天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
53 5
|
26天前
|
消息中间件 监控 NoSQL
Redis脑裂问题详解及解决方案
Redis脑裂问题是分布式系统中常见的复杂问题,合理配置Redis Sentinel、使用保护模式、采用分布式锁机制以及优化网络和客户端连接策略等措施,可以有效预防和解决脑裂问题。通过深入理解Redis脑裂问题的成因和影响,采取相应的解决方案,能够提高系统的可用性和数据一致性,保障Redis集群的稳定运行。希望本文能帮助你更好地理解和应对Redis脑裂问题。
31 2
|
1月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
40 5
|
1月前
|
消息中间件 NoSQL Java
Spring Boot整合Redis
通过Spring Boot整合Redis,可以显著提升应用的性能和响应速度。在本文中,我们详细介绍了如何配置和使用Redis,包括基本的CRUD操作和具有过期时间的值设置方法。希望本文能帮助你在实际项目中高效地整合和使用Redis。
57 2
|
2月前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
91 5
|
2月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
132 2
|
1月前
|
JavaScript NoSQL Java
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
47 0
|
5月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。