Java 开发中基于 Spring Boot 框架实现 MQTT 消息推送与订阅功能详解

本文涉及的产品
应用实时监控服务-应用监控,每月50GB免费额度
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 本文介绍基于Spring Boot集成MQTT协议实现消息推送与订阅的技术方案。涵盖MQTT协议概述、核心概念(Broker、Client、Topic、QoS)及应用场景,详细说明在Spring Boot中通过配置依赖、连接信息、客户端工厂及消息通道实现消息发布与订阅服务。提供物联网设备监控系统的应用实例,包括设备状态上报、服务器指令下发和实时数据处理。同时,探讨单元测试方法、生产环境部署注意事项(安全配置、性能优化、高可用性)以及总结MQTT在高效可靠消息通信系统中的应用价值。资源链接:[点击查看](https://pan.quark.cn/s/14fcf913bae6)。

Spring Boot集成MQTT实现消息推送与订阅技术方案

一、MQTT协议概述与应用场景

MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息传输协议,具有低带宽占用、低功耗、支持QoS等级等特点,广泛应用于物联网、移动应用、即时通讯等场景。

核心概念:

  • Broker:消息代理服务器,处理客户端连接和消息路由
  • Client:消息发布者或订阅者
  • Topic:消息主题,用于消息分类和过滤
  • QoS:服务质量等级(0-最多一次,1-至少一次,2-仅一次)

二、Spring Boot集成MQTT实现方案

1. 引入依赖

pom.xml中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

2. 配置MQTT连接信息

application.properties中配置MQTT服务器地址和认证信息:

# MQTT配置
mqtt.host=tcp://localhost:1883
mqtt.clientId=spring-boot-mqtt-client
mqtt.username=admin
mqtt.password=password
mqtt.defaultTopic=test/topic

3. 创建MQTT配置类

配置MQTT客户端工厂和消息通道:

@Configuration
public class MqttConfig {
   

    @Value("${mqtt.host}")
    private String host;

    @Value("${mqtt.clientId}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.defaultTopic}")
    private String defaultTopic;

    // 配置MQTT客户端工厂
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
   
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{
   host});
        mqttConnectOptions.setKeepAliveInterval(20);
        return mqttConnectOptions;
    }

    // 配置MQTT客户端
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
   
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    // 其他配置...
}

4. 实现消息发布服务

创建服务类实现消息发布功能:

@Service
public class MqttPublisher {
   

    private final MqttPahoClientFactory mqttClientFactory;
    private final String clientId;

    public MqttPublisher(MqttPahoClientFactory mqttClientFactory, 
                         @Value("${mqtt.clientId}") String clientId) {
   
        this.mqttClientFactory = mqttClientFactory;
        this.clientId = clientId + "-publisher";
    }

    public void publish(String topic, String payload) {
   
        try (MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                clientId, mqttClientFactory)) {
   
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic(topic);
            messageHandler.start();
            messageHandler.handleMessage(new GenericMessage<>(payload));
        } catch (Exception e) {
   
            log.error("MQTT publish error: {}", e.getMessage(), e);
        }
    }
}

5. 实现消息订阅服务

配置消息监听器接收订阅消息:

@Configuration
public class MqttSubscriberConfig {
   

    @Bean
    public MessageChannel mqttInputChannel() {
   
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound(MqttPahoClientFactory mqttClientFactory) {
   
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        "subscriberClient", 
                        mqttClientFactory,
                        "test/topic/#"); // 订阅主题通配符
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
   
        return message -> {
   
            String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
            String payload = message.getPayload().toString();
            System.out.println("收到MQTT消息 - 主题: " + topic + ", 内容: " + payload);
            // 处理接收到的消息
        };
    }
}

三、应用实例:物联网设备监控系统

场景说明:

构建一个简单的物联网设备监控系统,实现:

  1. 设备状态数据定时上报
  2. 服务器远程控制指令下发
  3. 实时数据展示与告警

1. 设备端实现(模拟)

@Service
public class DeviceSimulator {
   

    @Autowired
    private MqttPublisher mqttPublisher;

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public void startReporting(String deviceId) {
   
        scheduler.scheduleAtFixedRate(() -> {
   
            // 生成模拟设备数据
            Map<String, Object> payload = new HashMap<>();
            payload.put("deviceId", deviceId);
            payload.put("temperature", 20 + new Random().nextInt(10));
            payload.put("humidity", 40 + new Random().nextInt(20));
            payload.put("status", "online");
            payload.put("timestamp", System.currentTimeMillis());

            // 发布设备数据到MQTT
            String jsonPayload = new ObjectMapper().writeValueAsString(payload);
            mqttPublisher.publish("device/data/" + deviceId, jsonPayload);
        }, 0, 5, TimeUnit.SECONDS); // 每5秒上报一次
    }
}

2. 服务端数据处理

@Service
public class DeviceDataService {
   

    @Autowired
    private DeviceRepository deviceRepository;

    @ServiceActivator(inputChannel = "mqttInputChannel")
    public void handleDeviceData(Message<?> message) {
   
        String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
        String payload = message.getPayload().toString();

        // 解析设备数据
        if (topic.startsWith("device/data/")) {
   
            try {
   
                DeviceData deviceData = new ObjectMapper().readValue(payload, DeviceData.class);
                // 保存设备数据
                deviceRepository.save(deviceData);
                // 检查告警阈值
                checkAlarms(deviceData);
            } catch (JsonProcessingException e) {
   
                log.error("解析设备数据失败: {}", e.getMessage(), e);
            }
        }
    }

    private void checkAlarms(DeviceData data) {
   
        // 检查温度是否超过阈值
        if (data.getTemperature() > 30) {
   
            // 触发高温告警
            sendAlarm("高温告警", 
                    "设备 " + data.getDeviceId() + " 温度异常: " + data.getTemperature() + "°C");
        }
    }

    private void sendAlarm(String title, String content) {
   
        // 发送告警通知
        log.warn("ALARM: {} - {}", title, content);
    }
}

3. 控制指令下发

@RestController
@RequestMapping("/api/device")
public class DeviceController {
   

    @Autowired
    private MqttPublisher mqttPublisher;

    @PostMapping("/{deviceId}/command")
    public ResponseEntity<String> sendCommand(
            @PathVariable String deviceId,
            @RequestBody DeviceCommand command) {
   

        try {
   
            // 将命令转换为JSON格式
            String payload = new ObjectMapper().writeValueAsString(command);
            // 发布命令到设备
            mqttPublisher.publish("device/command/" + deviceId, payload);
            return ResponseEntity.ok("命令已发送");
        } catch (Exception e) {
   
            log.error("发送设备命令失败: {}", e.getMessage(), e);
            return ResponseEntity.status(500).body("发送命令失败");
        }
    }
}

四、测试与验证

1. 单元测试示例

@SpringBootTest
class MqttIntegrationTest {
   

    @Autowired
    private MqttPublisher mqttPublisher;

    @Test
    void testMqttPublishAndSubscribe() throws Exception {
   
        // 创建测试主题
        String testTopic = "test/integration/" + UUID.randomUUID().toString();
        String testPayload = "Hello, MQTT!";

        // 设置异步测试监听器
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<String> receivedPayload = new AtomicReference<>();

        MessageHandler testHandler = message -> {
   
            receivedPayload.set(message.getPayload().toString());
            latch.countDown();
        };

        // 发布消息
        mqttPublisher.publish(testTopic, testPayload);

        // 等待消息接收
        assertTrue(latch.await(5, TimeUnit.SECONDS));
        assertEquals(testPayload, receivedPayload.get());
    }
}

2. 使用MQTT客户端工具测试

可以使用MQTT.fx、HiveMQ Client等工具连接到MQTT Broker,手动发布和订阅消息进行测试。

五、生产环境部署注意事项

  1. 安全配置

    • 使用TLS加密连接(tcp:// → ssl://)
    • 启用客户端认证和权限控制
    • 定期更换凭证和密钥
  2. 性能优化

    • 根据业务量调整QoS等级
    • 合理设计主题层级结构
    • 考虑使用集群部署提高吞吐量
  3. 高可用性

    • 配置MQTT Broker集群
    • 实现客户端自动重连机制
    • 考虑消息持久化存储

六、总结

通过Spring Boot集成MQTT,我们可以快速实现高效、可靠的消息通信系统。本文介绍了MQTT的基本概念、Spring Boot集成方案、实际应用案例以及生产环境部署注意事项。在实际项目中,可以根据具体需求扩展功能,如添加消息持久化、分布式处理、多协议适配等。


Java 开发,Spring Boot 框架,MQTT 协议,消息推送,消息订阅,物联网开发,实时通信,微服务架构,Spring Integration,MQTT 客户端,QoS 机制,遗嘱消息,主题订阅,消息持久化,异步通信



资源地址:
https://pan.quark.cn/s/14fcf913bae6


相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4天前
|
Java Spring
聊聊你对SpringBoot框架的理解 ?
SpringBoot是Spring家族中流行的子项目,旨在简化Spring框架开发的繁琐配置。它主要提供三大功能:starter起步依赖简化依赖管理,自动配置根据条件创建Bean,以及内嵌Web服务器支持Jar包运行,极大提升了开发效率。
29 0
|
6天前
|
安全 Java 领域建模
Java 17 探秘:不容错过的现代开发利器
Java 17 探秘:不容错过的现代开发利器
|
2天前
|
安全 Java API
Java 17 及以上版本核心特性在现代开发实践中的深度应用与高效实践方法 Java 开发实践
本项目以“学生成绩管理系统”为例,深入实践Java 17+核心特性与现代开发技术。采用Spring Boot 3.1、WebFlux、R2DBC等构建响应式应用,结合Record类、模式匹配、Stream优化等新特性提升代码质量。涵盖容器化部署(Docker)、自动化测试、性能优化及安全加固,全面展示Java最新技术在实际项目中的应用,助力开发者掌握现代化Java开发方法。
18 1
|
6天前
|
NoSQL Java 数据库连接
SpringBoot框架
Spring Boot 是 Spring 家族中最流行的框架,旨在简化 Spring 应用的初始搭建与开发。它通过自动配置、起步依赖和内嵌服务器三大核心功能,大幅减少配置复杂度,提升开发效率。开发者可快速构建独立运行的 Web 应用,并支持多种数据访问技术和第三方集成。
|
7天前
|
Java 数据库连接 API
Java 8 + 特性及 Spring Boot 与 Hibernate 等最新技术的实操内容详解
本内容涵盖Java 8+核心语法、Spring Boot与Hibernate实操,按考试考点分类整理,含技术详解与代码示例,助力掌握最新Java技术与应用。
22 2
|
2天前
|
缓存 安全 Java
Spring 框架核心原理与实践解析
本文详解 Spring 框架核心知识,包括 IOC(容器管理对象)与 DI(容器注入依赖),以及通过注解(如 @Service、@Autowired)声明 Bean 和注入依赖的方式。阐述了 Bean 的线程安全(默认单例可能有安全问题,需业务避免共享状态或设为 prototype)、作用域(@Scope 注解,常用 singleton、prototype 等)及完整生命周期(实例化、依赖注入、初始化、销毁等步骤)。 解析了循环依赖的解决机制(三级缓存)、AOP 的概念(公共逻辑抽为切面)、底层动态代理(JDK 与 Cglib 的区别)及项目应用(如日志记录)。介绍了事务的实现(基于 AOP
|
3天前
|
存储 缓存 NoSQL
Spring Cache缓存框架
Spring Cache是Spring体系下的标准化缓存框架,支持多种缓存(如Redis、EhCache、Caffeine),可独立或组合使用。其优势包括平滑迁移、注解与编程两种使用方式,以及高度解耦和灵活管理。通过动态代理实现缓存操作,适用于不同业务场景。
|
3天前
|
IDE Java API
Java 17 新特性与微服务开发的实操指南
本内容涵盖Java 11至Java 17最新特性实战,包括var关键字、字符串增强、模块化系统、Stream API、异步编程、密封类等,并提供图书管理系统实战项目,帮助开发者掌握现代Java开发技巧与工具。
16 0
|
4天前
|
消息中间件 NoSQL Java
SpringBoot框架常见的starter你都用过哪些 ?
本节介绍常见的Spring Boot Starter,分为官方(如Web、AOP、Redis等)与第三方(如MyBatis、MyBatis Plus)两类,用于快速集成Web开发、数据库、消息队列等功能。
22 0
|
5天前
|
缓存 安全 Java
第五章 Spring框架
第五章 Spring框架