Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动

简介: Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站零基础入门的AI学习网站~。

1为什么要使用MQ?


在 Spring Boot Event这篇文章中 已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?


首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。


通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

源码地址:

https://gitee.com/sparkle3021/springboot3-study


2整合RocketMQ

依赖版本


JDK 17


Spring Boot 3.2.0


RocketMQ-Client 5.0.4


RocketMQ-Starter 2.2.0


Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

引入RocketMQ依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.4</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>


解决Spring Boot3+不兼容 spring.factories

rocketmq-spring-boot-starter:2.2.2版本中:

参考配置文件
# RocketMQ 配置
rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: event-mq-group
    # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
    pull-batch-size: 1
  producer:
    # 发送同一类消息的设置为同一个group,保证唯一
    group: event-mq-group
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 10000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false


方法一 :通过 @Import(RocketMQAutoConfiguration.class) 在配置类中引入

方法二:在resources资源目录下创建文件夹及文件 META-INF/spring,org.springframework.boot.autoconfigure.AutoConfiguration.imports。

文件内容为RocketMQ自动配置类路径: org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

3RocketMQ 使用

解决Spring Boot3+不支持spring.factories的问题
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
 
/**
 * 启动类
 */
@Import(RocketMQAutoConfiguration.class)
@SpringBootApplication
public class MQEventApplication {
    public static void main(String[] args) {
        SpringApplication.run(MQEventApplication.class, args);
    }
}


RocketMQ操作工具

RocketMQ Message实体

import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
 
import java.io.Serializable;
import java.util.List;
 
/**
 * RocketMQ 消息
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {
 
    /**
     * 消息队列主题
     */
    @NotBlank(message = "MQ Topic 不能为空")
    private String topic;
 
    /**
     * 延迟级别
     */
    @Builder.Default
    private DelayLevel delayLevel = DelayLevel.OFF;
 
    /**
     * 消息体
     */
    private T message;
 
    /**
     * 消息体
     */
    private List<T> messages;
 
    /**
     * 使用有序消息发送时,指定发送到队列
     */
    private String hashKey;
 
    /**
     * 任务Id,用于日志打印相关信息
     */
    @Builder.Default
    private String taskId = IdUtil.fastSimpleUUID();
}


RocketMQTemplate 二次封装

import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
/**
 * RocketMQ 消息工具类
 */
@Slf4j
@Component
public class RocketMQService {
 
    @Resource
    private RocketMQTemplate rocketMQTemplate;
 
    @Value("${rocketmq.producer.sendMessageTimeout}")
    private int sendMessageTimeout;
 
    /**
     * 异步发送消息回调
     *
     * @param taskId 任务Id
     * @param topic  消息主题
     * @return the send callback
     */
    private static SendCallback asyncSendCallback(String taskId, String topic) {
        return new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());
            }
 
            @Override
            public void onException(Throwable throwable) {
                log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());
            }
        };
    }
 
    /**
     * 发送同步消息,使用有序发送请设置HashKey
     *
     * @param message 消息参数
     */
    public <T> void syncSend(RocketMQMessage<T> message) {
        log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
        SendResult sendResult;
        if (StringUtils.isNotBlank(message.getHashKey())) {
            sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
        } else {
            sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());
        }
        log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
                message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
    }
 
    /**
     * 批量发送同步消息
     *
     * @param message 消息参数
     */
    public <T> void syncSendBatch(RocketMQMessage<T> message) {
        log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
                message.getTaskId(), message.getTopic(), message.getMessages().size());
        SendResult sendResult;
        if (StringUtils.isNotBlank(message.getHashKey())) {
            sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());
        } else {
            sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());
        }
        log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
                message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
    }
 
    /**
     * 异步发送消息,异步返回消息结果
     *
     * @param message 消息参数
     */
    public <T> void asyncSend(RocketMQMessage<T> message) {
        log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
        if (StringUtils.isNotBlank(message.getHashKey())) {
            rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),
                    asyncSendCallback(message.getTaskId(), message.getTopic()));
        } else {
            rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),
                    asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());
        }
    }
 
    /**
     * 批量异步发送消息
     *
     * @param message 消息参数
     */
    public <T> void asyncSendBatch(RocketMQMessage<T> message) {
        log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
                message.getTaskId(), message.getTopic(), message.getMessages().size());
        if (StringUtils.isNotBlank(message.getHashKey())) {
            rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),
                    asyncSendCallback(message.getTaskId(), message.getTopic()));
        } else {
            rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),
                    asyncSendCallback(message.getTaskId(), message.getTopic()));
        }
    }
 
    /**
     * 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
     *
     * @param message 消息参数
     */
    public <T> void sendOneWay(RocketMQMessage<T> message) {
        sendOneWay(message, false);
    }
 
    /**
     * 单向消息 - 批量发送
     *
     * @param message 消息体
     * @param batch   是否为批量操作
     */
    public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {
        log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]"
                        : "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),
                message.getTaskId(), message.getTopic(), message.getMessages().size());
        if (StringUtils.isNotBlank(message.getHashKey())) {
            if (batch) {
                message.getMessages().
                        forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));
            } else {
                rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
            }
        } else {
            if (batch) {
                message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));
            } else {
                rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());
            }
        }
    }
}


定义RocketMQ消费者
import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
 
/**
 * MQ消息监听
 */
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,
        consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("MQListener 接收消息 : {}", message);
    }
}
定义测试类发送消息
import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
 
/**
 * MQ测试
 */
@SpringBootTest
public class MQTest {
 
    @Resource
    private RocketMQService rocketMQService;
 
    @Test
    public void sendMessage() {
        int count = 1;
        while (count <= 50) {
            rocketMQService.syncSend(RocketMQMessage.builder()
                    .topic(MQConfig.EVENT_TOPIC)
                    .message(count++)
                    .build());
        }
        // 休眠等待消费消息
        ThreadUtil.sleep(2000L);
    }
}


4测试

总结

感谢您的阅读~




相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
人工智能 Java Nacos
基于 Spring AI Alibaba + Nacos 的分布式 Multi-Agent 构建指南
本文将针对 Spring AI Alibaba + Nacos 的分布式多智能体构建方案展开介绍,同时结合 Demo 说明快速开发方法与实际效果。
2207 63
|
3月前
|
存储 安全 Java
管理 Spring 微服务中的分布式会话
在微服务架构中,管理分布式会话是确保用户体验一致性和系统可扩展性的关键挑战。本文探讨了在 Spring 框架下实现分布式会话管理的多种方法,包括集中式会话存储和客户端会话存储(如 Cookie),并分析了它们的优缺点。同时,文章还涵盖了与分布式会话相关的安全考虑,如数据加密、令牌验证、安全 Cookie 政策以及服务间身份验证。此外,文中强调了分布式会话在提升系统可扩展性、增强可用性、实现数据一致性及优化资源利用方面的显著优势。通过合理选择会话管理策略,结合 Spring 提供的强大工具,开发人员可以在保证系统鲁棒性的同时,提供无缝的用户体验。
|
4月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
814 3
|
2月前
|
负载均衡 Java API
《深入理解Spring》Spring Cloud 构建分布式系统的微服务全家桶
Spring Cloud为微服务架构提供一站式解决方案,涵盖服务注册、配置管理、负载均衡、熔断限流等核心功能,助力开发者构建高可用、易扩展的分布式系统,并持续向云原生演进。
|
3月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
258 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
6月前
|
Java API 数据库
JPA简介:Spring Boot环境下的实践指南
上述内容仅是JPA在Spring Boot环境下使用的冰山一角,实际的实践中你会发现更深更广的应用。总而言之,只要掌握了JPA的规则,你就可以借助Spring Boot无比丰富的功能,娴熟地驾驶这台高性能的跑车,在属于你的程序世界里驰骋。
241 15
|
7月前
|
人工智能 负载均衡 Java
Spring AI Alibaba 发布企业级 MCP 分布式部署方案
本文介绍了Spring AI Alibaba MCP的开发与应用,旨在解决企业级AI Agent在分布式环境下的部署和动态更新问题。通过集成Nacos,Spring AI Alibaba实现了流量负载均衡及节点变更动态感知等功能。开发者可方便地将企业内部业务系统发布为MCP服务或开发自己的AI Agent。文章详细描述了如何通过代理应用接入存量业务系统,以及全新MCP服务的开发流程,并提供了完整的配置示例和源码链接。未来,Spring AI Alibaba计划结合Nacos3的mcp-registry与mcp-router能力,进一步优化Agent开发体验。
2588 14
|
10月前
|
NoSQL Java Redis
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
1018 83
|
7月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
704 4