Java 开发中基于 Spring Boot 框架实现 MQTT 消息推送与订阅功能详解

简介: 本文介绍基于Spring Boot集成MQTT协议实现消息推送与订阅的技术方案。涵盖MQTT协议概述、核心概念(Broker、Client、Topic、QoS)及应用场景,详细说明在Spring Boot中通过配置依赖、连接信息、客户端工厂及消息通道实现消息发布与订阅服务。提供物联网设备监控系统的应用实例,包括设备状态上报、服务器指令下发和实时数据处理。同时,探讨单元测试方法、生产环境部署注意事项(安全配置、性能优化、高可用性)以及总结MQTT在高效可靠消息通信系统中的应用价值。资源链接:[点击查看](https://pan.quark.cn/s/14fcf913bae6)。

Spring Boot集成MQTT实现消息推送与订阅技术方案

一、MQTT协议概述与应用场景

MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息传输协议,具有低带宽占用、低功耗、支持QoS等级等特点,广泛应用于物联网、移动应用、即时通讯等场景。

核心概念:

  • Broker:消息代理服务器,处理客户端连接和消息路由
  • Client:消息发布者或订阅者
  • Topic:消息主题,用于消息分类和过滤
  • QoS:服务质量等级(0-最多一次,1-至少一次,2-仅一次)

二、Spring Boot集成MQTT实现方案

1. 引入依赖

pom.xml中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

2. 配置MQTT连接信息

application.properties中配置MQTT服务器地址和认证信息:

# MQTT配置
mqtt.host=tcp://localhost:1883
mqtt.clientId=spring-boot-mqtt-client
mqtt.username=admin
mqtt.password=password
mqtt.defaultTopic=test/topic

3. 创建MQTT配置类

配置MQTT客户端工厂和消息通道:

@Configuration
public class MqttConfig {
   

    @Value("${mqtt.host}")
    private String host;

    @Value("${mqtt.clientId}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.defaultTopic}")
    private String defaultTopic;

    // 配置MQTT客户端工厂
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
   
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{
   host});
        mqttConnectOptions.setKeepAliveInterval(20);
        return mqttConnectOptions;
    }

    // 配置MQTT客户端
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
   
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    // 其他配置...
}

4. 实现消息发布服务

创建服务类实现消息发布功能:

@Service
public class MqttPublisher {
   

    private final MqttPahoClientFactory mqttClientFactory;
    private final String clientId;

    public MqttPublisher(MqttPahoClientFactory mqttClientFactory, 
                         @Value("${mqtt.clientId}") String clientId) {
   
        this.mqttClientFactory = mqttClientFactory;
        this.clientId = clientId + "-publisher";
    }

    public void publish(String topic, String payload) {
   
        try (MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                clientId, mqttClientFactory)) {
   
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic(topic);
            messageHandler.start();
            messageHandler.handleMessage(new GenericMessage<>(payload));
        } catch (Exception e) {
   
            log.error("MQTT publish error: {}", e.getMessage(), e);
        }
    }
}

5. 实现消息订阅服务

配置消息监听器接收订阅消息:

@Configuration
public class MqttSubscriberConfig {
   

    @Bean
    public MessageChannel mqttInputChannel() {
   
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound(MqttPahoClientFactory mqttClientFactory) {
   
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        "subscriberClient", 
                        mqttClientFactory,
                        "test/topic/#"); // 订阅主题通配符
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
   
        return message -> {
   
            String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
            String payload = message.getPayload().toString();
            System.out.println("收到MQTT消息 - 主题: " + topic + ", 内容: " + payload);
            // 处理接收到的消息
        };
    }
}

三、应用实例:物联网设备监控系统

场景说明:

构建一个简单的物联网设备监控系统,实现:

  1. 设备状态数据定时上报
  2. 服务器远程控制指令下发
  3. 实时数据展示与告警

1. 设备端实现(模拟)

@Service
public class DeviceSimulator {
   

    @Autowired
    private MqttPublisher mqttPublisher;

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public void startReporting(String deviceId) {
   
        scheduler.scheduleAtFixedRate(() -> {
   
            // 生成模拟设备数据
            Map<String, Object> payload = new HashMap<>();
            payload.put("deviceId", deviceId);
            payload.put("temperature", 20 + new Random().nextInt(10));
            payload.put("humidity", 40 + new Random().nextInt(20));
            payload.put("status", "online");
            payload.put("timestamp", System.currentTimeMillis());

            // 发布设备数据到MQTT
            String jsonPayload = new ObjectMapper().writeValueAsString(payload);
            mqttPublisher.publish("device/data/" + deviceId, jsonPayload);
        }, 0, 5, TimeUnit.SECONDS); // 每5秒上报一次
    }
}

2. 服务端数据处理

@Service
public class DeviceDataService {
   

    @Autowired
    private DeviceRepository deviceRepository;

    @ServiceActivator(inputChannel = "mqttInputChannel")
    public void handleDeviceData(Message<?> message) {
   
        String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
        String payload = message.getPayload().toString();

        // 解析设备数据
        if (topic.startsWith("device/data/")) {
   
            try {
   
                DeviceData deviceData = new ObjectMapper().readValue(payload, DeviceData.class);
                // 保存设备数据
                deviceRepository.save(deviceData);
                // 检查告警阈值
                checkAlarms(deviceData);
            } catch (JsonProcessingException e) {
   
                log.error("解析设备数据失败: {}", e.getMessage(), e);
            }
        }
    }

    private void checkAlarms(DeviceData data) {
   
        // 检查温度是否超过阈值
        if (data.getTemperature() > 30) {
   
            // 触发高温告警
            sendAlarm("高温告警", 
                    "设备 " + data.getDeviceId() + " 温度异常: " + data.getTemperature() + "°C");
        }
    }

    private void sendAlarm(String title, String content) {
   
        // 发送告警通知
        log.warn("ALARM: {} - {}", title, content);
    }
}

3. 控制指令下发

@RestController
@RequestMapping("/api/device")
public class DeviceController {
   

    @Autowired
    private MqttPublisher mqttPublisher;

    @PostMapping("/{deviceId}/command")
    public ResponseEntity<String> sendCommand(
            @PathVariable String deviceId,
            @RequestBody DeviceCommand command) {
   

        try {
   
            // 将命令转换为JSON格式
            String payload = new ObjectMapper().writeValueAsString(command);
            // 发布命令到设备
            mqttPublisher.publish("device/command/" + deviceId, payload);
            return ResponseEntity.ok("命令已发送");
        } catch (Exception e) {
   
            log.error("发送设备命令失败: {}", e.getMessage(), e);
            return ResponseEntity.status(500).body("发送命令失败");
        }
    }
}

四、测试与验证

1. 单元测试示例

@SpringBootTest
class MqttIntegrationTest {
   

    @Autowired
    private MqttPublisher mqttPublisher;

    @Test
    void testMqttPublishAndSubscribe() throws Exception {
   
        // 创建测试主题
        String testTopic = "test/integration/" + UUID.randomUUID().toString();
        String testPayload = "Hello, MQTT!";

        // 设置异步测试监听器
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<String> receivedPayload = new AtomicReference<>();

        MessageHandler testHandler = message -> {
   
            receivedPayload.set(message.getPayload().toString());
            latch.countDown();
        };

        // 发布消息
        mqttPublisher.publish(testTopic, testPayload);

        // 等待消息接收
        assertTrue(latch.await(5, TimeUnit.SECONDS));
        assertEquals(testPayload, receivedPayload.get());
    }
}

2. 使用MQTT客户端工具测试

可以使用MQTT.fx、HiveMQ Client等工具连接到MQTT Broker,手动发布和订阅消息进行测试。

五、生产环境部署注意事项

  1. 安全配置

    • 使用TLS加密连接(tcp:// → ssl://)
    • 启用客户端认证和权限控制
    • 定期更换凭证和密钥
  2. 性能优化

    • 根据业务量调整QoS等级
    • 合理设计主题层级结构
    • 考虑使用集群部署提高吞吐量
  3. 高可用性

    • 配置MQTT Broker集群
    • 实现客户端自动重连机制
    • 考虑消息持久化存储

六、总结

通过Spring Boot集成MQTT,我们可以快速实现高效、可靠的消息通信系统。本文介绍了MQTT的基本概念、Spring Boot集成方案、实际应用案例以及生产环境部署注意事项。在实际项目中,可以根据具体需求扩展功能,如添加消息持久化、分布式处理、多协议适配等。


Java 开发,Spring Boot 框架,MQTT 协议,消息推送,消息订阅,物联网开发,实时通信,微服务架构,Spring Integration,MQTT 客户端,QoS 机制,遗嘱消息,主题订阅,消息持久化,异步通信



资源地址:
https://pan.quark.cn/s/14fcf913bae6


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
安全 Java Ruby
我尝试了所有后端框架 — — 这就是为什么只有 Spring Boot 幸存下来
作者回顾后端开发历程,指出多数框架在生产环境中难堪重负。相比之下,Spring Boot凭借内置安全、稳定扩展、完善生态和企业级支持,成为构建高可用系统的首选,真正经受住了时间与规模的考验。
297 2
|
2月前
|
人工智能 运维 Java
Spring AI Alibaba Admin 开源!以数据为中心的 Agent 开发平台
Spring AI Alibaba Admin 正式发布!一站式实现 Prompt 管理、动态热更新、评测集构建、自动化评估与全链路可观测,助力企业高效构建可信赖的 AI Agent 应用。开源共建,现已上线!
3761 50
|
2月前
|
安全 前端开发 Java
《深入理解Spring》:现代Java开发的核心框架
Spring自2003年诞生以来,已成为Java企业级开发的基石,凭借IoC、AOP、声明式编程等核心特性,极大简化了开发复杂度。本系列将深入解析Spring框架核心原理及Spring Boot、Cloud、Security等生态组件,助力开发者构建高效、可扩展的应用体系。(238字)
|
3月前
|
人工智能 Java 开发者
阿里出手!Java 开发者狂喜!开源 AI Agent 框架 JManus 来了,初次见面就心动~
JManus是阿里开源的Java版OpenManus,基于Spring AI Alibaba框架,助力Java开发者便捷应用AI技术。支持多Agent框架、网页配置、MCP协议及PLAN-ACT模式,可集成多模型,适配阿里云百炼平台与本地ollama。提供Docker与源码部署方式,具备无限上下文处理能力,适用于复杂AI场景。当前仍在完善模型配置等功能,欢迎参与开源共建。
1616 58
阿里出手!Java 开发者狂喜!开源 AI Agent 框架 JManus 来了,初次见面就心动~
|
2月前
|
存储 安全 Java
《数据之美》:Java集合框架全景解析
Java集合框架是数据管理的核心工具,涵盖List、Set、Map等体系,提供丰富接口与实现类,支持高效的数据操作与算法处理。
|
2月前
|
消息中间件 缓存 Java
Spring框架优化:提高Java应用的性能与适应性
以上方法均旨在综合考虑Java Spring 应该程序设计原则, 数据库交互, 编码实践和系统架构布局等多角度因素, 旨在达到高效稳定运转目标同时也易于未来扩展.
144 8
|
2月前
|
存储 算法 安全
Java集合框架:理解类型多样性与限制
总之,在 Java 题材中正确地应对多样化与约束条件要求开发人员深入理解面向对象原则、范式编程思想以及JVM工作机理等核心知识点。通过精心设计与周密规划能够有效地利用 Java 高级特征打造出既健壮又灵活易维护系统软件产品。
106 7
|
2月前
|
XML Java 应用服务中间件
【SpringBoot(一)】Spring的认知、容器功能讲解与自动装配原理的入门,带你熟悉Springboot中基本的注解使用
SpringBoot专栏开篇第一章,讲述认识SpringBoot、Bean容器功能的讲解、自动装配原理的入门,还有其他常用的Springboot注解!如果想要了解SpringBoot,那么就进来看看吧!
430 2
|
3月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
1422 1
|
3月前
|
消息中间件 人工智能 Java
抖音微信爆款小游戏大全:免费休闲/竞技/益智/PHP+Java全筏开源开发
本文基于2025年最新行业数据,深入解析抖音/微信爆款小游戏的开发逻辑,重点讲解PHP+Java双引擎架构实战,涵盖技术选型、架构设计、性能优化与开源生态,提供完整开源工具链,助力开发者从理论到落地打造高留存、高并发的小游戏产品。

热门文章

最新文章

相关产品

  • 云消息队列 MQ