使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收

简介: 使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收

使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收是很常见的场景,让我们逐步来看如何实现。

 

### 1. 配置Spring Boot项目

 

首先,确保你的Spring Boot项目已经配置好。可以使用Spring Initializr生成一个简单的Spring Boot项目,并在`pom.xml`中添加必要的依赖项。

```xml
    org.springframework.boot
    spring-boot-starter-activemq
 
    org.springframework.boot
    spring-boot-starter-mqtt
```

 

### 2. 配置ActiveMQ

 

在`application.properties`(或`application.yml`)中配置ActiveMQ的连接信息:

```properties
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
```

这里使用了默认的ActiveMQ连接信息,你可以根据自己的实际情况进行修改。

 

### 3. 配置MQTT

 

如果需要使用MQTT,也需要在`application.properties`中添加相关配置(可选,如果不需要可以忽略):


同样,这里的连接信息也是默认的,你需要根据自己的MQTT代理配置进行修改。

 

### 4. 编写消息发送者

 

创建一个消息发送者组件,用来发送消息到ActiveMQ和MQTT:

 

```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
 
@Component
public class MessageSender {
 
    @Autowired
    private JmsTemplate jmsTemplate; // For ActiveMQ
 
    @Autowired(required = false) // Optional: For MQTT
    private MqttPahoMessageHandler mqttPahoMessageHandler;
 
    public void sendMessageToActiveMQ(String destination, String message) {
        jmsTemplate.convertAndSend(destination, message);
    }
 
    public void sendMessageToMQTT(String topic, String message) {
        Message mqttMessage = MessageBuilder.withPayload(message).build();
        mqttPahoMessageHandler.handleMessage(mqttMessage);
    }
}
```

### 5. 编写消息接收者

 

创建消息接收者组件,用来从ActiveMQ和MQTT接收消息:

 

```java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
@Component
public class MessageReceiver {
 
    @JmsListener(destination = "queue-name") // For ActiveMQ
    public void receiveMessageFromActiveMQ(String message) {
        System.out.println("Received message from ActiveMQ: " + message);
    }
 
    @MqttListener(topic = "topic-name") // For MQTT
    public void receiveMessageFromMQTT(@Payload String message) {
        System.out.println("Received message from MQTT: " + message);
    }
}
```

### 6. 示例用法

 

现在可以在你的服务或控制器中使用 `MessageSender` 发送消息,并且通过 `MessageReceiver` 接收消息了:

```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class MessagingController {
 
    @Autowired
    private MessageSender messageSender;
 
    @GetMapping("/send")
    public String sendMessage() {
        messageSender.sendMessageToActiveMQ("queue-name", "Hello from ActiveMQ!");
        messageSender.sendMessageToMQTT("topic-name", "Hello from MQTT!");
        return "Messages sent!";
    }
}
```

### 注意事项

 

- 确保ActiveMQ和MQTT代理(例如Mosquitto)已经正确安装和运行,并且端口和认证信息正确配置。

- 在实际部署中,考虑安全性、消息序列化和异常处理等因素。

 

### 1. ActiveMQ 配置详解

 

- **持久化配置**: 可以配置ActiveMQ以使用持久化存储,确保消息不会在代理关闭时丢失。可以通过配置`spring.activemq.broker-url`来指定ActiveMQ的连接URL。

- **队列和主题**: ActiveMQ支持队列(Queue)和主题(Topic)。队列用于点对点通信,主题用于发布-订阅模式。在`@JmsListener`中指定`destination`即可监听队列或主题。

 

### 2. MQTT 配置详解

 

- **QoS等级**: MQTT支持不同的服务质量(QoS)等级,包括0、1和2,可以在发送消息时设置适当的QoS级别。

- **持久会话**: MQTT客户端可以创建持久会话,确保即使客户端离线,消息也能够被保留和传递给客户端重新连接时。

- **TLS加密**: 如果需要安全通信,可以配置MQTT使用TLS/SSL加密。

 

### 3. 异常处理和重试

 

- 在生产环境中,确保添加适当的异常处理和重试机制,以处理连接错误、发送超时等情况。可以使用Spring的`RetryTemplate`来实现重试逻辑。

- 对于不同的消息队列系统(ActiveMQ和MQTT),可能需要定制化的异常处理策略。

 

### 4. 性能优化和监控

 

- 监控消息队列的性能和负载情况,以确保系统在高负载时能够稳定运行。可以使用JMX或者专业的监控工具。

- 针对高并发情况,考虑优化消息处理和线程管理策略,避免潜在的性能瓶颈。

 

### 5. 安全性考虑

 

- 配置消息队列的访问控制和认证机制,以确保只有授权的应用程序可以访问和发送消息。

- 对于敏感数据,考虑使用加密技术来保护消息内容的安全传输。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
6月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
59 0
|
2月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
119 1
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1041 3
|
5月前
|
消息中间件 Java Maven
|
5月前
|
XML 前端开发 Java
Spring MVC接收param参数(直接接收、注解接收、集合接收、实体接收)
Spring MVC提供了灵活多样的参数接收方式,可以满足各种不同场景下的需求。了解并熟练运用这些基本的参数接收技巧,可以使得Web应用的开发更加方便、高效。同时,也是提高代码的可读性和维护性的关键所在。在实际开发过程中,根据具体需求选择最合适的参数接收方式,能够有效提升开发效率和应用性能。
142 3
|
5月前
|
XML 前端开发 Java
Spring MVC接收param参数(直接接收、注解接收、集合接收、实体接收)
Spring MVC提供了灵活多样的参数接收方式,可以满足各种不同场景下的需求。了解并熟练运用这些基本的参数接收技巧,可以使得Web应用的开发更加方便、高效。同时,也是提高代码的可读性和维护性的关键所在。在实际开发过程中,根据具体需求选择最合适的参数接收方式,能够有效提升开发效率和应用性能。
239 2