使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收

简介: 使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收

使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收是很常见的场景,让我们逐步来看如何实现。

 

### 1. 配置Spring Boot项目

 

首先,确保你的Spring Boot项目已经配置好。可以使用Spring Initializr生成一个简单的Spring Boot项目,并在`pom.xml`中添加必要的依赖项。

```xml
    org.springframework.boot
    spring-boot-starter-activemq
 
    org.springframework.boot
    spring-boot-starter-mqtt
```

 

### 2. 配置ActiveMQ

 

在`application.properties`(或`application.yml`)中配置ActiveMQ的连接信息:

```properties
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
```

这里使用了默认的ActiveMQ连接信息,你可以根据自己的实际情况进行修改。

 

### 3. 配置MQTT

 

如果需要使用MQTT,也需要在`application.properties`中添加相关配置(可选,如果不需要可以忽略):


同样,这里的连接信息也是默认的,你需要根据自己的MQTT代理配置进行修改。

 

### 4. 编写消息发送者

 

创建一个消息发送者组件,用来发送消息到ActiveMQ和MQTT:

 

```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
 
@Component
public class MessageSender {
 
    @Autowired
    private JmsTemplate jmsTemplate; // For ActiveMQ
 
    @Autowired(required = false) // Optional: For MQTT
    private MqttPahoMessageHandler mqttPahoMessageHandler;
 
    public void sendMessageToActiveMQ(String destination, String message) {
        jmsTemplate.convertAndSend(destination, message);
    }
 
    public void sendMessageToMQTT(String topic, String message) {
        Message mqttMessage = MessageBuilder.withPayload(message).build();
        mqttPahoMessageHandler.handleMessage(mqttMessage);
    }
}
```

### 5. 编写消息接收者

 

创建消息接收者组件,用来从ActiveMQ和MQTT接收消息:

 

```java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
@Component
public class MessageReceiver {
 
    @JmsListener(destination = "queue-name") // For ActiveMQ
    public void receiveMessageFromActiveMQ(String message) {
        System.out.println("Received message from ActiveMQ: " + message);
    }
 
    @MqttListener(topic = "topic-name") // For MQTT
    public void receiveMessageFromMQTT(@Payload String message) {
        System.out.println("Received message from MQTT: " + message);
    }
}
```

### 6. 示例用法

 

现在可以在你的服务或控制器中使用 `MessageSender` 发送消息,并且通过 `MessageReceiver` 接收消息了:

```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
public class MessagingController {
 
    @Autowired
    private MessageSender messageSender;
 
    @GetMapping("/send")
    public String sendMessage() {
        messageSender.sendMessageToActiveMQ("queue-name", "Hello from ActiveMQ!");
        messageSender.sendMessageToMQTT("topic-name", "Hello from MQTT!");
        return "Messages sent!";
    }
}
```

### 注意事项

 

- 确保ActiveMQ和MQTT代理(例如Mosquitto)已经正确安装和运行,并且端口和认证信息正确配置。

- 在实际部署中,考虑安全性、消息序列化和异常处理等因素。

 

### 1. ActiveMQ 配置详解

 

- **持久化配置**: 可以配置ActiveMQ以使用持久化存储,确保消息不会在代理关闭时丢失。可以通过配置`spring.activemq.broker-url`来指定ActiveMQ的连接URL。

- **队列和主题**: ActiveMQ支持队列(Queue)和主题(Topic)。队列用于点对点通信,主题用于发布-订阅模式。在`@JmsListener`中指定`destination`即可监听队列或主题。

 

### 2. MQTT 配置详解

 

- **QoS等级**: MQTT支持不同的服务质量(QoS)等级,包括0、1和2,可以在发送消息时设置适当的QoS级别。

- **持久会话**: MQTT客户端可以创建持久会话,确保即使客户端离线,消息也能够被保留和传递给客户端重新连接时。

- **TLS加密**: 如果需要安全通信,可以配置MQTT使用TLS/SSL加密。

 

### 3. 异常处理和重试

 

- 在生产环境中,确保添加适当的异常处理和重试机制,以处理连接错误、发送超时等情况。可以使用Spring的`RetryTemplate`来实现重试逻辑。

- 对于不同的消息队列系统(ActiveMQ和MQTT),可能需要定制化的异常处理策略。

 

### 4. 性能优化和监控

 

- 监控消息队列的性能和负载情况,以确保系统在高负载时能够稳定运行。可以使用JMX或者专业的监控工具。

- 针对高并发情况,考虑优化消息处理和线程管理策略,避免潜在的性能瓶颈。

 

### 5. 安全性考虑

 

- 配置消息队列的访问控制和认证机制,以确保只有授权的应用程序可以访问和发送消息。

- 对于敏感数据,考虑使用加密技术来保护消息内容的安全传输。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7天前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4天前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
11 1
|
4天前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
|
6天前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
6天前
|
消息中间件 Java Apache
使用Spring Boot实现与ActiveMQ的消息队列集成
使用Spring Boot实现与ActiveMQ的消息队列集成
|
7天前
|
消息中间件 JavaScript Java
消息队列 MQ产品使用合集之如何嵌入到Spring Boot中运行
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
9天前
|
消息中间件 Java Spring
Spring Boot与RabbitMQ的集成应用
Spring Boot与RabbitMQ的集成应用
|
9天前
|
消息中间件 Java 机器人
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
9天前
|
消息中间件 Java 机器人
使用Spring Boot实现与ActiveMQ的消息队列集成
使用Spring Boot实现与ActiveMQ的消息队列集成
|
9天前
|
消息中间件 Java RocketMQ
教程:Spring Boot整合RocketMQ的配置与优化
教程:Spring Boot整合RocketMQ的配置与优化