SpringBoot整合MQTT实战:基于EMQX实现双向设备通信

本文涉及的产品
多模态交互后付费免费试用,全链路、全Agent
简介: 本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。

简言:

在万物互联的时代,MQTT协议凭借其轻量级、高效率的特性,已成为物联网通信的事实标准。本教程将带领您在Ubuntu系统上搭建EMQX 5.9.0消息服务器,并使用Spring Boot快速实现两个客户端的高效通信。通过本指南,您将掌握:

企业级MQTT消息中间件的部署

Spring Boot与MQTT协议的深度集成

双向实时通信的完整实现方案

生产级应用的最佳实践建议


源码地址:https://gitcode.com/Var_ya/mqtt_viteClient

参考文档:

  1. 在 Ubuntu 上安装 EMQX:https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
  2. MQTTX 下载:https://mqttx.app/zh/downloads

一、🛠️ 搭建魔法邮局(EMQX服务器)

扩展:在安装EMQX前记得先更新先软件包

apt update

1. 安装EMQX企业版
在Ubuntu终端输入以下咒语:

# 下载魔法卷轴(安装包)
wget https://www.emqx.com/zh/downloads/enterprise/5.9.0/emqx-enterprise-5.9.0-ubuntu24.04-amd64.deb

# 解开卷轴封印
sudo dpkg -i emqx-enterprise-5.9.0-ubuntu20.04-amd64.deb

# 启动邮局服务
sudo systemctl start emqx

2. 打开魔法管理台
浏览器访问 http://localhost:18083,默认账号admin/public,你将看到:


二、📱 准备第一个信使(MQTTX客户端)

安装MQTTX桌面版

安装地址:https://mqttx.app/zh/downloads

打开后新建连接:

  • 名称:魔法邮箱_varin.cn
  • 服务器:varin:1883


🔍 让我们用Spring Boot的魔法升级Java程序! 把魔杖(原生Java)换成自动施法的魔法书(Spring Boot)~

三、Spring Boot的核心初始化

1. 创建魔法卷轴(Spring Boot项目)

Spring Initializr生成项目,勾选:

  • Spring Web (发送HTTP咒语)
  • Spring Integration (MQTT魔法核心)

2. 添加飞天扫帚驱动(POM依赖)

<!--        消息中间件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
<!--        流消息-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
<!--   核心依赖:     mqtt客户端-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.5</version>
        </dependency>

3.设置application.yml内容

spring:
  application:
    name: mqtt-client-api
  mqtt:
    username: varya
    password: 123456
    url: tcp://varin.cn:1883
    subClientId: sub_client_id_varya
    subTopic: mqttx_and_springboot_client/,
    pubClientId: pub_client_id_vay
server:
  port: 9999
# knife4j的增强配置,不需要增强可以不配
knife4j:
  enable: true    # 开启knife4j,无需添加@EnableKnife4j注解
  setting:
    language: zh_cn   #中文
  #  swagger-model-name: 实体列表   #默认为: Swagger Models
  basic: # 开启Swagger的Basic认证功能,默认是false
    enable: false
    username: varya
    password: varya

3.建立读取mqtt关于application.yml文件实体

package cn.varin.mqttclientapi.entity;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
 * mqtt配置属性实体类
 */
@Data
@ConfigurationProperties(prefix = "spring.mqtt") // 读取yml文件中的配置
public class MqttConfigProperties {
    private String username;
    private String password;
    private String url;
    private String subClientId;
    private String subTopic;
    private String pubClientId;
}

4. 参考文件目录设置

(注:该代码已上传gitcode代码仓库,欢迎阅读,下载


🧙♂️ 四、Mqtt核心基础配置(代码篇)

1. MqttConfig(mqtt配置类)

package cn.varin.mqttclientapi.config;
import cn.varin.mqttclientapi.entity.MqttConfigProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@Configuration
public class MqttConfig {
    // yml获取配置内容
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
//    连接工厂建立
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory (){
        // 建立默认工程
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        // 设置连接选项内容
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setUserName(mqttConfigProperties.getUsername());
        mqttConnectOptions.setPassword(mqttConfigProperties.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
        defaultMqttPahoClientFactory.setConnectionOptions(mqttConnectOptions );
        return  defaultMqttPahoClientFactory;
    }
}

🧙♂️ 五、Mqtt入站信息配置(代码篇)


1. MqttConfig(mqtt配置类)

package cn.varin.mqttclientapi.config;
import cn.varin.mqttclientapi.entity.MqttConfigProperties;
import cn.varin.mqttclientapi.handler.MqttMessageHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
 *
 * 配饰入站消息配置
 */
@Configuration
public class MqttInboundConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
    @Autowired
    private MqttPahoClientFactory mqttPahoClientFactory;
    // 建立入站通道
    @Bean
    public  MessageChannel messageInboundChannel(){
        return  new DirectChannel();
    }
    // 配置入站适配器
    @Bean
    public MessageProducer messageProducer(){
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
                mqttConfigProperties.getUrl(),
                mqttConfigProperties.getSubClientId(),
                mqttPahoClientFactory,
                mqttConfigProperties.getSubTopic().split(",")
        );
        mqttPahoMessageDrivenChannelAdapter.setQos(2);
        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        // 设置通道
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel( messageInboundChannel());
        return mqttPahoMessageDrivenChannelAdapter;
    }
    // 设置接收消息处理器
//    @Bean
//    @ServiceActivator(inputChannel = "messageInboundChannel")
//    public MessageHandler messageHandler (){
//        return new MqttMessageHandle();
//    }
}

2. 建立入站信息处理器(MqttMessageHandle)

package cn.varin.mqttclientapi.handler;
import cn.varin.mqttclientapi.entity.MqttMessageResponseBody;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.swagger.v3.core.util.Json;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
/**
 *
 * 接收消息处理器
 *
 */
@Component
public class MqttMessageHandle implements MessageHandler {
   @ServiceActivator(inputChannel = "messageInboundChannel") // 用于指定通道
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
       System.out.println("=================");
       MessageHeaders headers = message.getHeaders();
       String mqtt_receivedTopic = headers.get("mqtt_receivedTopic").toString();
       System.out.println(mqtt_receivedTopic);
       System.out.println("=================");
    }
}

🧙♂️ 六、Mqtt出站信息配置(代码篇)

1. MqttOutboundConfig(mqtt出站信息配置类)

package cn.varin.mqttclientapi.config;
import cn.varin.mqttclientapi.entity.MqttConfigProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttOutboundConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
    @Autowired
    private MqttPahoClientFactory mqttPahoClientFactory;
    // 建立出站通道
    @Bean
    public MessageChannel messageOutboundChannel(){
        return new DirectChannel();
    }
    // 建立发送消息配置
    @ServiceActivator(inputChannel = "messageOutboundChannel")
    @Bean
    public MessageHandler messageOutboundHandle(){
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
        mqttConfigProperties.getUrl(),
        mqttConfigProperties.getPubClientId(),
        mqttPahoClientFactory
        );
        messageHandler.setDefaultQos(2);
        messageHandler.setDefaultTopic("default");
        messageHandler.setAsync(true);
        return messageHandler;
    }
}

2. 建立发送消息网关(MqttGetway)

package cn.varin.mqttclientapi.getway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "messageOutboundChannel")
public interface MqttGetway {
    void send(@Header(value = MqttHeaders.TOPIC) String topic, String payload);
    void send(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) Integer qos, String payload);
}

2. 建立mqtt发送消息服务

package cn.varin.mqttclientapi.service;
public interface MqttMessageSenderService {
    void send(String topic, String payload);
    void send(String topic, Integer qos, String payload);
}
package cn.varin.mqttclientapi.service.impl;
import cn.varin.mqttclientapi.getway.MqttGetway;
import cn.varin.mqttclientapi.service.MqttMessageSenderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MqttMessageSenderServiceImpl  implements MqttMessageSenderService {
    @Autowired
    private MqttGetway mqttGetway;
    @Override
    public void send(String topic, String payload) {
        mqttGetway.send(topic,payload);
    }
    @Override
    public void send(String topic, Integer qos, String payload) {
        mqttGetway.send(topic,qos,payload);
    }
}

🧙♂️ 七、Mqtt消息发送Controller(代码篇)

package cn.varin.mqttclientapi.controller;
import cn.varin.mqttclientapi.entity.MqttRequestBody;
import cn.varin.mqttclientapi.handler.UnifiedResponseHandler;
import cn.varin.mqttclientapi.service.MqttMessageSenderService;
import cn.varin.mqttclientapi.service.impl.MqttMessageSenderServiceImpl;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@Tag(name = "MQTT服务接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {
    @Autowired
    private MqttMessageSenderServiceImpl mqttMessageSenderService;
    @Operation(summary = "发送消息,")
    @PostMapping("/send,有qos")
    public UnifiedResponseHandler.Result send(@RequestBody MqttRequestBody mqttRequestBody){
            System.out.println(mqttRequestBody.toString());
        mqttMessageSenderService.send(mqttRequestBody.getMqtt_topic(),mqttRequestBody.getQos(),mqttRequestBody.getPayload());
        return new UnifiedResponseHandler.Result(200,"success",null);
    }
    @Operation(summary = "发送消息,无qos")
    @PostMapping("/send")
    public UnifiedResponseHandler.Result send2(@RequestBody MqttRequestBody mqttRequestBody){
        System.out.println(mqttRequestBody.toString());
        mqttMessageSenderService.send(mqttRequestBody.getMqtt_topic(),mqttRequestBody.getPayload());
        return new UnifiedResponseHandler.Result(200,"success",null);
    }
}

🧙♂️ 八、MqttTest测试文件(代码篇)

  1. 参考目录

  1. 建立test启动类
package cn.varin.mqttclientapi;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class MqttClientApiApplicationTests {
    @Test
    void contextLoads() {
    }
}
  1. 测试代码
package cn.varin.mqttclientapi.test;
import cn.varin.mqttclientapi.service.impl.MqttMessageSenderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(value = "MqttClientApiApplicationTests.class")
public class MqttMesageSenderTest {
    @Autowired
    private MqttMessageSenderServiceImpl mqttMessageSenderService;
    @Test
    public void MqttMessageSendTest(){
        // 实际业务
        mqttMessageSenderService.send("java_test/","testaaa");
    }
}

🌌🚀  九、通信魔法测试大赏

场景1:使用Test测试类测试

  1. 点击启动按钮(画红线的绿色按钮)

  1. 显示测试结果

场景2:HTTP请求测试(使用idea自带的http接口测试插件)

  1. 在MQTTX发送:
POST http://localhost:9999/mqtt/send
Content-Type: application/json
{
  "mqtt_topic":"java_test/",
  "qos":2,
  "payload":"h33333ello"
}

(点击红线上的绿色按钮)

  1. 测试结果:

  1. Spring Boot控制台会:



升级完毕! 现在你的MQTT程序拥有了Spring Boot的自动施法能力,就像拥有了老魔杖+隐形斗篷+复活石的组合!快去征服分布式魔法世界吧~ 🎩

常见问题排查

现象

检查方向

解决手段

连接失败

防火墙设置/端口开放

netstat -tulnp

消息丢失

QoS级别配置

确认使用QoS1/2

高延迟

网络带宽/负载均衡

EMQX集群横向扩展


通过本方案,您已经构建了一个基于Spring Boot的企业级MQTT通信系统。这种架构可广泛应用于物联网设备管理、实时数据采集、远程控制等场景,为智能硬件与云端系统搭建了可靠的消息桥梁。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
27天前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
6月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
4月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
2377 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
5月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
511 4
|
4月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
588 0
|
6月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
|
20天前
|
前端开发 安全 Java
基于springboot+vue开发的会议预约管理系统
一个完整的会议预约管理系统,包含前端用户界面、管理后台和后端API服务。 ### 后端 - **框架**: Spring Boot 2.7.18 - **数据库**: MySQL 5.6+ - **ORM**: MyBatis Plus 3.5.3.1 - **安全**: Spring Security + JWT - **Java版本**: Java 11 ### 前端 - **框架**: Vue 3.3.4 - **UI组件**: Element Plus 2.3.8 - **构建工具**: Vite 4.4.5 - **状态管理**: Pinia 2.1.6 - **HTTP客户端
125 4
基于springboot+vue开发的会议预约管理系统
|
5月前
|
JavaScript 前端开发 Java
制造业ERP源码,工厂ERP管理系统,前端框架:Vue,后端框架:SpringBoot
这是一套基于SpringBoot+Vue技术栈开发的ERP企业管理系统,采用Java语言与vscode工具。系统涵盖采购/销售、出入库、生产、品质管理等功能,整合客户与供应商数据,支持在线协同和业务全流程管控。同时提供主数据管理、权限控制、工作流审批、报表自定义及打印、在线报表开发和自定义表单功能,助力企业实现高效自动化管理,并通过UniAPP实现移动端支持,满足多场景应用需求。
464 1