使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收

简介: 使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收

使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收是很常见的场景,让我们逐步来看如何实现。

 

### 1. 配置Spring Boot项目

 

首先,确保你的Spring Boot项目已经配置好。可以使用Spring Initializr生成一个简单的Spring Boot项目,并在`pom.xml`中添加必要的依赖项。

```xml
    org.springframework.boot
    spring-boot-starter-activemq
 
    org.springframework.boot
    spring-boot-starter-mqtt
```

 

### 2. 配置ActiveMQ

 

在`application.properties`(或`application.yml`)中配置ActiveMQ的连接信息:

```properties
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
```

这里使用了默认的ActiveMQ连接信息,你可以根据自己的实际情况进行修改。

 

### 3. 配置MQTT

 

如果需要使用MQTT,也需要在`application.properties`中添加相关配置(可选,如果不需要可以忽略):


同样,这里的连接信息也是默认的,你需要根据自己的MQTT代理配置进行修改。

 

### 4. 编写消息发送者

 

创建一个消息发送者组件,用来发送消息到ActiveMQ和MQTT:

 

```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
 
@Component
public class MessageSender {
 
    @Autowired
    private JmsTemplate jmsTemplate; // For ActiveMQ
 
    @Autowired(required = false) // Optional: For MQTT
    private MqttPahoMessageHandler mqttPahoMessageHandler;
 
    public void sendMessageToActiveMQ(String destination, String message) {
        jmsTemplate.convertAndSend(destination, message);
    }
 
    public void sendMessageToMQTT(String topic, String message) {
        Message mqttMessage = MessageBuilder.withPayload(message).build();
        mqttPahoMessageHandler.handleMessage(mqttMessage);
    }
}
```

### 5. 编写消息接收者

 

创建消息接收者组件,用来从ActiveMQ和MQTT接收消息:

 

```java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
@Component
public class MessageReceiver {
 
    @JmsListener(destination = "queue-name") // For ActiveMQ
    public void receiveMessageFromActiveMQ(String message) {
        System.out.println("Received message from ActiveMQ: " + message);
    }
 
    @MqttListener(topic = "topic-name") // For MQTT
    public void receiveMessageFromMQTT(@Payload String message) {
        System.out.println("Received message from MQTT: " + message);
    }
}
```

### 6. 示例用法

 

现在可以在你的服务或控制器中使用 `MessageSender` 发送消息,并且通过 `MessageReceiver` 接收消息了:

```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class MessagingController {
 
    @Autowired
    private MessageSender messageSender;
 
    @GetMapping("/send")
    public String sendMessage() {
        messageSender.sendMessageToActiveMQ("queue-name", "Hello from ActiveMQ!");
        messageSender.sendMessageToMQTT("topic-name", "Hello from MQTT!");
        return "Messages sent!";
    }
}
```

### 注意事项

 

- 确保ActiveMQ和MQTT代理(例如Mosquitto)已经正确安装和运行,并且端口和认证信息正确配置。

- 在实际部署中,考虑安全性、消息序列化和异常处理等因素。

 

### 1. ActiveMQ 配置详解

 

- **持久化配置**: 可以配置ActiveMQ以使用持久化存储,确保消息不会在代理关闭时丢失。可以通过配置`spring.activemq.broker-url`来指定ActiveMQ的连接URL。

- **队列和主题**: ActiveMQ支持队列(Queue)和主题(Topic)。队列用于点对点通信,主题用于发布-订阅模式。在`@JmsListener`中指定`destination`即可监听队列或主题。

 

### 2. MQTT 配置详解

 

- **QoS等级**: MQTT支持不同的服务质量(QoS)等级,包括0、1和2,可以在发送消息时设置适当的QoS级别。

- **持久会话**: MQTT客户端可以创建持久会话,确保即使客户端离线,消息也能够被保留和传递给客户端重新连接时。

- **TLS加密**: 如果需要安全通信,可以配置MQTT使用TLS/SSL加密。

 

### 3. 异常处理和重试

 

- 在生产环境中,确保添加适当的异常处理和重试机制,以处理连接错误、发送超时等情况。可以使用Spring的`RetryTemplate`来实现重试逻辑。

- 对于不同的消息队列系统(ActiveMQ和MQTT),可能需要定制化的异常处理策略。

 

### 4. 性能优化和监控

 

- 监控消息队列的性能和负载情况,以确保系统在高负载时能够稳定运行。可以使用JMX或者专业的监控工具。

- 针对高并发情况,考虑优化消息处理和线程管理策略,避免潜在的性能瓶颈。

 

### 5. 安全性考虑

 

- 配置消息队列的访问控制和认证机制,以确保只有授权的应用程序可以访问和发送消息。

- 对于敏感数据,考虑使用加密技术来保护消息内容的安全传输。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
11月前
|
消息中间件 存储 Java
第15课: Spring Boot中集成ActiveMQ
第15课: Spring Boot中集成ActiveMQ
691 0
|
9月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
2987 1
|
9月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
654 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
9月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
510 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
8月前
|
XML JSON Java
【SpringBoot(三)】从请求到响应再到视图解析与模板引擎,本文带你领悟SpringBoot请求接收全流程!
Springboot专栏第三章,从请求的接收到视图解析,再到thymeleaf模板引擎的使用! 本文带你领悟SpringBoot请求接收到渲染的使用全流程!
603 3
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
391 32
|
11月前
|
人工智能 JSON Java
Spring Boot 如何接收并处理不确定类型的请求参数?
在 Spring Boot 中,当需要处理结构不确定的 JSON 数据时,可以使用 `Map` 类型灵活接收键值对数据。对于更复杂的场景,可通过 Jackson 注解支持多态类型、自定义反序列化器,或在接收后动态解析 JSON 数据,提升处理灵活性和扩展性。
363 0
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
2585 0