简言:
在万物互联的时代,MQTT协议凭借其轻量级、高效率的特性,已成为物联网通信的事实标准。本教程将带领您在Ubuntu系统上搭建EMQX 5.9.0消息服务器,并使用Spring Boot快速实现两个客户端的高效通信。通过本指南,您将掌握:
✅ 企业级MQTT消息中间件的部署
✅ Spring Boot与MQTT协议的深度集成
✅ 双向实时通信的完整实现方案
✅ 生产级应用的最佳实践建议
源码地址:https://gitcode.com/Var_ya/mqtt_viteClient
参考文档:
- 在 Ubuntu 上安装 EMQX:https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
- MQTTX 下载:https://mqttx.app/zh/downloads
一、🛠️ 搭建魔法邮局(EMQX服务器)
扩展:在安装EMQX前记得先更新先软件包
apt update
1. 安装EMQX企业版
在Ubuntu终端输入以下咒语:
# 下载魔法卷轴(安装包) wget https://www.emqx.com/zh/downloads/enterprise/5.9.0/emqx-enterprise-5.9.0-ubuntu24.04-amd64.deb
# 解开卷轴封印 sudo dpkg -i emqx-enterprise-5.9.0-ubuntu20.04-amd64.deb
# 启动邮局服务 sudo systemctl start emqx
2. 打开魔法管理台
浏览器访问 http://localhost:18083
,默认账号admin/public
,你将看到:
二、📱 准备第一个信使(MQTTX客户端)
安装MQTTX桌面版
安装地址:https://mqttx.app/zh/downloads
打开后新建连接:
- 名称:魔法邮箱_varin.cn
- 服务器:
varin:1883
🔍 让我们用Spring Boot的魔法升级Java程序! 把魔杖(原生Java)换成自动施法的魔法书(Spring Boot)~
✨ 三、Spring Boot的核心初始化
1. 创建魔法卷轴(Spring Boot项目)
用Spring Initializr生成项目,勾选:
- Spring Web (发送HTTP咒语)
- Spring Integration (MQTT魔法核心)
2. 添加飞天扫帚驱动(POM依赖)
<!-- 消息中间件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <!-- 流消息--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <!-- 核心依赖: mqtt客户端--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.5</version> </dependency>
3.设置application.yml内容
spring: application: name: mqtt-client-api mqtt: username: varya password: 123456 url: tcp://varin.cn:1883 subClientId: sub_client_id_varya subTopic: mqttx_and_springboot_client/, pubClientId: pub_client_id_vay server: port: 9999 # knife4j的增强配置,不需要增强可以不配 knife4j: enable: true # 开启knife4j,无需添加@EnableKnife4j注解 setting: language: zh_cn #中文 # swagger-model-name: 实体列表 #默认为: Swagger Models basic: # 开启Swagger的Basic认证功能,默认是false enable: false username: varya password: varya
3.建立读取mqtt关于application.yml文件实体
package cn.varin.mqttclientapi.entity; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; /** * mqtt配置属性实体类 */ @Data @ConfigurationProperties(prefix = "spring.mqtt") // 读取yml文件中的配置 public class MqttConfigProperties { private String username; private String password; private String url; private String subClientId; private String subTopic; private String pubClientId; }
4. 参考文件目录设置
(注:该代码已上传gitcode代码仓库,欢迎阅读,下载)
🧙♂️ 四、Mqtt核心基础配置(代码篇)
1. MqttConfig(mqtt配置类)
package cn.varin.mqttclientapi.config; import cn.varin.mqttclientapi.entity.MqttConfigProperties; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; @Configuration public class MqttConfig { // yml获取配置内容 @Autowired private MqttConfigProperties mqttConfigProperties; // 连接工厂建立 @Bean public MqttPahoClientFactory mqttPahoClientFactory (){ // 建立默认工程 DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory(); // 设置连接选项内容 MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName(mqttConfigProperties.getUsername()); mqttConnectOptions.setPassword(mqttConfigProperties.getPassword().toCharArray()); mqttConnectOptions.setServerURIs(new String[]{mqttConfigProperties.getUrl()}); defaultMqttPahoClientFactory.setConnectionOptions(mqttConnectOptions ); return defaultMqttPahoClientFactory; } }
🧙♂️ 五、Mqtt入站信息配置(代码篇)
1. MqttConfig(mqtt配置类)
package cn.varin.mqttclientapi.config; import cn.varin.mqttclientapi.entity.MqttConfigProperties; import cn.varin.mqttclientapi.handler.MqttMessageHandle; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; /** * * 配饰入站消息配置 */ @Configuration public class MqttInboundConfig { @Autowired private MqttConfigProperties mqttConfigProperties; @Autowired private MqttPahoClientFactory mqttPahoClientFactory; // 建立入站通道 @Bean public MessageChannel messageInboundChannel(){ return new DirectChannel(); } // 配置入站适配器 @Bean public MessageProducer messageProducer(){ MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( mqttConfigProperties.getUrl(), mqttConfigProperties.getSubClientId(), mqttPahoClientFactory, mqttConfigProperties.getSubTopic().split(",") ); mqttPahoMessageDrivenChannelAdapter.setQos(2); mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); // 设置通道 mqttPahoMessageDrivenChannelAdapter.setOutputChannel( messageInboundChannel()); return mqttPahoMessageDrivenChannelAdapter; } // 设置接收消息处理器 // @Bean // @ServiceActivator(inputChannel = "messageInboundChannel") // public MessageHandler messageHandler (){ // return new MqttMessageHandle(); // } }
2. 建立入站信息处理器(MqttMessageHandle)
package cn.varin.mqttclientapi.handler; import cn.varin.mqttclientapi.entity.MqttMessageResponseBody; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.swagger.v3.core.util.Json; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; /** * * 接收消息处理器 * */ @Component public class MqttMessageHandle implements MessageHandler { @ServiceActivator(inputChannel = "messageInboundChannel") // 用于指定通道 @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println("================="); MessageHeaders headers = message.getHeaders(); String mqtt_receivedTopic = headers.get("mqtt_receivedTopic").toString(); System.out.println(mqtt_receivedTopic); System.out.println("================="); } }
🧙♂️ 六、Mqtt出站信息配置(代码篇)
1. MqttOutboundConfig(mqtt出站信息配置类)
package cn.varin.mqttclientapi.config; import cn.varin.mqttclientapi.entity.MqttConfigProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration public class MqttOutboundConfig { @Autowired private MqttConfigProperties mqttConfigProperties; @Autowired private MqttPahoClientFactory mqttPahoClientFactory; // 建立出站通道 @Bean public MessageChannel messageOutboundChannel(){ return new DirectChannel(); } // 建立发送消息配置 @ServiceActivator(inputChannel = "messageOutboundChannel") @Bean public MessageHandler messageOutboundHandle(){ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( mqttConfigProperties.getUrl(), mqttConfigProperties.getPubClientId(), mqttPahoClientFactory ); messageHandler.setDefaultQos(2); messageHandler.setDefaultTopic("default"); messageHandler.setAsync(true); return messageHandler; } }
2. 建立发送消息网关(MqttGetway)
package cn.varin.mqttclientapi.getway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; @MessagingGateway(defaultRequestChannel = "messageOutboundChannel") public interface MqttGetway { void send(@Header(value = MqttHeaders.TOPIC) String topic, String payload); void send(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) Integer qos, String payload); }
2. 建立mqtt发送消息服务
package cn.varin.mqttclientapi.service; public interface MqttMessageSenderService { void send(String topic, String payload); void send(String topic, Integer qos, String payload); }
package cn.varin.mqttclientapi.service.impl; import cn.varin.mqttclientapi.getway.MqttGetway; import cn.varin.mqttclientapi.service.MqttMessageSenderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MqttMessageSenderServiceImpl implements MqttMessageSenderService { @Autowired private MqttGetway mqttGetway; @Override public void send(String topic, String payload) { mqttGetway.send(topic,payload); } @Override public void send(String topic, Integer qos, String payload) { mqttGetway.send(topic,qos,payload); } }
🧙♂️ 七、Mqtt消息发送Controller(代码篇)
package cn.varin.mqttclientapi.controller; import cn.varin.mqttclientapi.entity.MqttRequestBody; import cn.varin.mqttclientapi.handler.UnifiedResponseHandler; import cn.varin.mqttclientapi.service.MqttMessageSenderService; import cn.varin.mqttclientapi.service.impl.MqttMessageSenderServiceImpl; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @Tag(name = "MQTT服务接口") @RestController @RequestMapping("/mqtt") public class MqttController { @Autowired private MqttMessageSenderServiceImpl mqttMessageSenderService; @Operation(summary = "发送消息,") @PostMapping("/send,有qos") public UnifiedResponseHandler.Result send(@RequestBody MqttRequestBody mqttRequestBody){ System.out.println(mqttRequestBody.toString()); mqttMessageSenderService.send(mqttRequestBody.getMqtt_topic(),mqttRequestBody.getQos(),mqttRequestBody.getPayload()); return new UnifiedResponseHandler.Result(200,"success",null); } @Operation(summary = "发送消息,无qos") @PostMapping("/send") public UnifiedResponseHandler.Result send2(@RequestBody MqttRequestBody mqttRequestBody){ System.out.println(mqttRequestBody.toString()); mqttMessageSenderService.send(mqttRequestBody.getMqtt_topic(),mqttRequestBody.getPayload()); return new UnifiedResponseHandler.Result(200,"success",null); } }
🧙♂️ 八、MqttTest测试文件(代码篇)
- 参考目录
- 建立test启动类
package cn.varin.mqttclientapi; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class MqttClientApiApplicationTests { @Test void contextLoads() { } }
- 测试代码
package cn.varin.mqttclientapi.test; import cn.varin.mqttclientapi.service.impl.MqttMessageSenderServiceImpl; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(value = "MqttClientApiApplicationTests.class") public class MqttMesageSenderTest { @Autowired private MqttMessageSenderServiceImpl mqttMessageSenderService; @Test public void MqttMessageSendTest(){ // 实际业务 mqttMessageSenderService.send("java_test/","testaaa"); } }
🌌🚀 九、通信魔法测试大赏
场景1:使用Test测试类测试
- 点击启动按钮(画红线的绿色按钮)
- 显示测试结果
场景2:HTTP请求测试(使用idea自带的http接口测试插件)
- 在MQTTX发送:
POST http://localhost:9999/mqtt/send Content-Type: application/json { "mqtt_topic":"java_test/", "qos":2, "payload":"h33333ello" }
(点击红线上的绿色按钮)
- 测试结果:
- Spring Boot控制台会:
升级完毕! 现在你的MQTT程序拥有了Spring Boot的自动施法能力,就像拥有了老魔杖+隐形斗篷+复活石的组合!快去征服分布式魔法世界吧~ 🎩
常见问题排查
现象 |
检查方向 |
解决手段 |
连接失败 |
防火墙设置/端口开放 |
netstat -tulnp |
消息丢失 |
QoS级别配置 |
确认使用QoS1/2 |
高延迟 |
网络带宽/负载均衡 |
EMQX集群横向扩展 |
通过本方案,您已经构建了一个基于Spring Boot的企业级MQTT通信系统。这种架构可广泛应用于物联网设备管理、实时数据采集、远程控制等场景,为智能硬件与云端系统搭建了可靠的消息桥梁。