【Spring Cloud + RabbitMQ 实现分布式消息总线】—— 每天一点小知识

简介: 【Spring Cloud + RabbitMQ 实现分布式消息总线】—— 每天一点小知识

🐳使用 Spring Cloud + RabbitMQ 实现分布式消息总线

Spring Cloud 是一个用于构建分布式系统的开发工具包,而 RabbitMQ 是一种功能强大的消息代理。结合使用 Spring Cloud 和 RabbitMQ,我们可以实现一个强大的分布式消息总线。本文将介绍如何使用 Spring Cloud Bus 和 RabbitMQ 实现以下功能:

  1. 配置刷新
  2. 事件广播
  3. 服务监控
  4. 微服务间通信

准备工作

 💧在开始之前,确保您已经完成以下准备工作:

  • 安装并配置 RabbitMQ。
  • 创建一个 Spring Cloud 微服务项目。

配置刷新

 💧配置刷新是一种实现动态配置更新的功能。当配置发生变化时,我们希望能够自动刷新微服务的配置,而不需要重启服务。下面是实现配置刷新的步骤:

  1. 在您的 Spring Cloud 微服务项目中,添加以下依赖:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
  1. 配置 RabbitMQ 连接信息,在 application.properties 文件中添加以下配置:
spring.rabbitmq.host=<RabbitMQ 主机名>
spring.rabbitmq.port=<RabbitMQ 端口>
spring.rabbitmq.username=<RabbitMQ 用户名>
spring.rabbitmq.password=<RabbitMQ 密码>
  1. 在您的微服务的配置类上添加 @RefreshScope 注解,例如:
@RestController
@RefreshScope
public class MyController {
    // Controller code here
}
  1. 配置 RabbitMQ 发送消息的端点。在 application.properties 文件中添加以下配置:
management.endpoints.web.exposure.include=bus-refresh
  1. 启动您的微服务应用程序,并进行配置更改。
  2. 使用 POST 请求访问 /actuator/bus-refresh 端点,例如:http://localhost:8080/actuator/bus-refresh。这将触发配置刷新并更新微服务的配置。

事件广播

 💧事件广播是一种将事件消息广播给所有微服务实例的功能。下面是实现事件广播的步骤:

  1. 在您的 Spring Cloud 微服务项目中,添加以下依赖:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
  1. 配置 RabbitMQ 连接信息,在 application.properties 文件中添加以下配置:
spring.rabbitmq.host=<RabbitMQ 主机名>
spring.rabbitmq.port=<RabbitMQ 端口>
spring.rabbitmq.username=<RabbitMQ 用户名>
spring.rabbitmq.password=<RabbitMQ 密码>
  1. 创建一个事件消息发布者,用于发送事件消息。例如,创建一个名为 EventPublisher 的类:
import org.springframework.cloud.bus.BusProperties;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
@Component
public class EventPublisher {
    private final ApplicationEventPublisher eventPublisher;
    private final BusProperties busProperties;
    public EventPublisher(ApplicationEventPublisher eventPublisher, BusProperties busProperties) {
        this.eventPublisher = eventPublisher;
        this.busProperties = busProperties;
    }
    public void publishEvent(Object event) {
        String originService = busProperties.getId();
        RemoteApplicationEvent remoteEvent = new RemoteApplicationEvent(event, originService, null);
        eventPublisher.publishEvent(remoteEvent);
    }
}
  1. 在需要触发事件广播的地方,使用 EventPublisher 发布事件消息。例如,在一个 REST Controller 中:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MyController {
    private final EventPublisher eventPublisher;
    public MyController(EventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }
    @GetMapping("/trigger-event")
    public String triggerEvent() {
        // 创建事件对象
        MyEvent event = new MyEvent("Hello, world!");
        // 发布事件消息
        eventPublisher.publishEvent(event);
        return "Event triggered";
    }
}
  1. 启动多个微服务实例,并访问其中一个实例的 /trigger-event 路径。事件消息将被广播到所有微服务实例,每个实例都能接收到该事件并执行相应的操作。

服务监控

 💧服务监控可以实现对微服务实例的监控和告警。下面是实现服务监控的步骤:

  1. 在您的 Spring Cloud 微服务项目中,添加以下依赖:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
  1. 配置 RabbitMQ 连接信息,在 application.properties 文件中添加以下配置:
spring.rabbitmq.host=<RabbitMQ 主机名>
spring.rabbitmq.port=<RabbitMQ 端口>
spring.rabbitmq.username=<RabbitMQ 用户名>
spring.rabbitmq.password=<RabbitMQ 密码>
  1. 配置 RabbitMQ 发送消息的端点。在 application.properties 文件中添加以下配置:
management.endpoints.web.exposure.include=bus-health, bus-env
  1. 启动您的微服务应用程序,并访问 /actuator/bus-health/actuator/bus-env 端点,例如:http://localhost:8080/actuator/bus-healthhttp://localhost:8080/actuator/bus-env。这将提供有关微服务实例的健康状态信息和环境变量信息。
  2. 配置监控系统以订阅并处理 RabbitMQ 发送的监控消息,实现实时监控和告警功能。

微服务间通信

 💧微服务间通信是通过消息传递实现解耦和异步处理的方式。下面是实现微服务间通信的步骤:

  1. 在您的 Spring Cloud 微服务项目中,添加以下依赖:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
  1. 配置 RabbitMQ 连接信息,在 application.properties 文件中添加以下配置:
spring.rabbitmq.host=<RabbitMQ 主机名>
spring.rabbitmq.port=<RabbitMQ 端口>
spring.rabbitmq.username=<RabbitMQ 用户名>
spring.rabbitmq.password=<RabbitMQ 密码>
  1. 创建一个消息消费者,用于处理接收到的消息。例如,创建一个名为 MessageConsumer 的类:
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer implements ApplicationListener<RemoteApplicationEvent> {
    @Override
    public void onApplicationEvent(RemoteApplicationEvent event) {
        if (event instanceof MyMessageEvent) {
            MyMessageEvent messageEvent = (MyMessageEvent) event;
            // 处理接收到的消息
            handleMessage(messageEvent.getMessage());
        }
    }
    private void handleMessage(String message) {
        // 处理消息的逻辑
        System.out.println("Received message: " + message);
    }
}
  1. 创建一个消息发布者,用于发送消息给其他微服务。例如,创建一个名为 MessagePublisher 的类:
import org.springframework.cloud.bus.BusProperties;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
@Component
public class MessagePublisher {
    private final ApplicationEventPublisher eventPublisher;
    private final BusProperties busProperties;
    public MessagePublisher(ApplicationEventPublisher eventPublisher, BusProperties busProperties) {
        this.eventPublisher = eventPublisher;
        this.busProperties = busProperties;
    }
    public void publishMessage(String message) {
        String originService = busProperties.getId();
        MyMessageEvent messageEvent = new MyMessageEvent(message, originService);
        eventPublisher.publishEvent(messageEvent);
    }
}
  1. 创建一个自定义的消息事件类,用于封装消息内容。例如,创建一个名为 MyMessageEvent 的类:
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
public class MyMessageEvent extends RemoteApplicationEvent {
    private String message;
    // 构造函数、getter、setter 省略...
    public MyMessageEvent(String message, String originService) {
        super(message, originService);
        this.message = message;
    }
}
  1. 在需要发送消息的地方,使用 MessagePublisher 发布消息。例如,在一个 REST Controller 中:
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MyController {
    private final MessagePublisher messagePublisher;
    public MyController(MessagePublisher messagePublisher) {
        this.messagePublisher = messagePublisher;
    }
    @PostMapping("/send-message")
    public String sendMessage(@RequestBody String message) {
        // 发送消息给其他微服务
        messagePublisher.publishMessage(message);
        return "Message sent";
    }
}
  1. 启动多个微服务实例,并访问其中一个实例的 /send-message 路径,发送消息给其他微服务。其他微服务的 MessageConsumer 将接收到消息并进行处理。

总结

通过完成上述步骤,你可以结合 Spring Cloud 和 RabbitMQ 实现配置刷新、事件广播、服务监控以及微服务间通信的功能。这些功能可以提供更强大的分布式系统能力,并帮助实现解耦、异步处理和实时监控的目标。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
126 1
|
27天前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
1月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
2月前
|
存储 NoSQL Redis
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
Redis持久化、RDB和AOF方案、Redis主从集群、哨兵、分片集群、散列插槽、自动手动故障转移
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
|
2月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
2月前
|
消息中间件 Java 对象存储
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
52 2
|
2月前
|
消息中间件 存储 Java
SpringCloud基础4——RabbitMQ和SpringAMQP
消息队列MQ、RabbitMQ、SpringAMQP高级消息队列协议、发布/订阅模型、fanout、direct、topic模式
SpringCloud基础4——RabbitMQ和SpringAMQP
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
92 2
|
3月前
|
Java 微服务 Spring
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
文章介绍了如何利用Spring Cloud Alibaba快速构建大型电商系统的分布式微服务,包括服务限流降级等主要功能的实现,并通过注解和配置简化了Spring Cloud应用的接入和搭建过程。
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
|
4月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
505 15