一文读懂物联网 MQTT 协议之实战篇

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 一文读懂物联网 MQTT 协议之实战篇

一、前言

上一篇我们介绍了 MQTT 协议格式以及相关的特性:一文读懂物联网 MQTT 协议之基础特性篇,这一篇我们就来实战一番,理论得与实践结合,方能吃透 MQTT。

我的那个读者还提到了讲一下 Mosquitto,这是一款开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。

老周这就来带大家在 CentOS 上搭建 Mosquitto 服务器。

二、搭建准备

2.1 软件准备

从官网获取安装包:

wget http://mosquitto.org/files/source/mosquitto-1.4.14.tar.gz

2.2 安装

tar -zxvf mosquitto-1.4.14.tar.gz
cd mosquitto-1.4.14

2.3 修改配置文件

config.mk 包括了多个选项, 可按需关闭或开启,但一旦开启则需要先安装对应的模块。

vim config.mk



WITH_SRV:=yes
WITH_UUID:=yes
WITH_WEBSOCKETS:=yes

2.3.1 安装 c-areas

yum install c-ares-devel -y

2.3.2 安装 lib-uuid

yum install uuid-devel -y
yum install libuuid-devel -y

2.3.3 安装 libwebsockets

cd ~
wget https://github.com/warmcat/libwebsockets/archive/v3.2.1.tar.gz
tar zxvf v3.2.1.tar.gz
cd libwebsockets-3.2.1
mkdir build
cd build
cmake .. -DLIB_SUFFIX=64
make install
ldconfig
cd mosquitto-1.4.14
yum install openssl-devel -y

2.4 编译和安装

make && make install

执行编译 make 命令的时候,如果你的终端出现:

那就把把 WITH_WEBSOCKETS 从 yes 改成 no 后,就可以成功编译了。

WITH_WEBSOCKETS:=yes
改成
WITH_WEBSOCKETS:=no

如果你的应用不需要 websocket 协议,可以把这个参数给设置 no 关掉。

如果终端出现的是这样:

那么恭喜你,Mosquitto 安装成功了。

2.5 说明

程序文件将默认安装到以下位置

路径 程序文件
/usr/local/sbin mosquiotto server
/etc/mosquitto configuration
/usr/local/bin utility command

修正链接库路径

由于操作系统版本及架构原因,很容易出现安装之后的链接库无法被找到,如启动 mosquitto 客户端可能出现找不到 libmosquitto.so.1 文件,因此需要添加链接库路径:

vim /etc/ld.so.conf.d/liblocal.conf

在文件中添加以下内容:

/usr/local/lib64
/usr/local/lib
# 刷新
ldconfig

三、 Mosquitto Server 启动与测试

3.1 启动

3.1.1 mosquitto 默认以 mosquitto 用户启动

可以通过配置文件修改,需添加用户:

groupadd mosquitto
useradd -g mosquitto mosquitto

3.1.2 修改配置文件

mv /etc/mosquitto/mosquitto.conf.example /etc/mosquitto/mosquitto.conf
# =================================================================
# General configuration
# =================================================================
# 客户端心跳的间隔时间
#retry_interval 20
# 系统状态的刷新时间
#sys_interval 10
# 系统资源的回收时间,0表示尽快处理
#store_clean_interval 10
# 服务进程的PID
#pid_file /var/run/mosquitto.pid
# 服务进程的系统用户
#user mosquitto
# 客户端心跳消息的最大并发数
#max_inflight_messages 10
# 客户端心跳消息缓存队列
#max_queued_messages 100
# 用于设置客户端长连接的过期时间,默认永不过期
#persistent_client_expiration
# =================================================================
# Default listener
# =================================================================
# 服务绑定的IP地址
#bind_address
# 服务绑定的端口号
#port 1883
# 允许的最大连接数,-1表示没有限制
#max_connections -1
# cafile:CA证书文件
# capath:CA证书目录
# certfile:PEM证书文件
# keyfile:PEM密钥文件
#cafile
#capath
#certfile
#keyfile
# 必须提供证书以保证数据安全性
#require_certificate false
# 若require_certificate值为true,use_identity_as_username也必须为true
#use_identity_as_username false
# 启用PSK(Pre-shared-key)支持
#psk_hint
# SSL/TSL加密算法,可以使用“openssl ciphers”命令获取
# as the output of that command.
#ciphers
# =================================================================
# Persistence
# =================================================================
# 消息自动保存的间隔时间
#autosave_interval 1800
# 消息自动保存功能的开关
#autosave_on_changes false
# 持久化功能的开关
persistence true
# 持久化DB文件
persistence_file mosquitto.db
# 持久化DB文件目录
persistence_location /var/lib/mosquitto/
# =================================================================
# Logging
# =================================================================
# 4种日志模式:stdout、stderr、syslog、topic
# none 则表示不记日志,此配置可以提升些许性能
log_dest none
# 选择日志的级别(可设置多项)
#log_type error
#log_type warning
#log_type notice
#log_type information
# 是否记录客户端连接信息
#connection_messages true
# 是否记录日志时间
#log_timestamp true
# =================================================================
# Security
# =================================================================
# 客户端ID的前缀限制,可用于保证安全性
#clientid_prefixes
# 允许匿名用户
#allow_anonymous true
# 用户/密码文件,默认格式:username:password
#password_file
# PSK格式密码文件,默认格式:identity:key
#psk_file
# pattern write sensor/%u/data
# ACL权限配置,常用语法如下:
# 用户限制:user <username>
# 话题限制:topic [read|write] <topic>
# 正则限制:pattern write sensor/%u/data
#acl_file
# =================================================================
# Bridges
# =================================================================
# 允许服务之间使用“桥接”模式(可用于分布式部署)
#connection <name>
#address <host>[:<port>]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
# 设置桥接的客户端ID
#clientid
# 桥接断开时,是否清除远程服务器中的消息
#cleansession false
# 是否发布桥接的状态信息
#notifications true
# 设置桥接模式下,消息将会发布到的话题地址
# $SYS/broker/connection/<clientid>/state
#notification_topic
# 设置桥接的keepalive数值
#keepalive_interval 60
# 桥接模式,目前有三种:automatic、lazy、once
#start_type automatic
# 桥接模式automatic的超时时间
#restart_timeout 30
# 桥接模式lazy的超时时间
#idle_timeout 60
# 桥接客户端的用户名
#username
# 桥接客户端的密码
#password
# bridge_cafile:桥接客户端的CA证书文件
# bridge_capath:桥接客户端的CA证书目录
# bridge_certfile:桥接客户端的PEM证书文件
# bridge_keyfile:桥接客户端的PEM密钥文件
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile

关于详细配置可参考:http://mosquitto.org/man/mosquitto-conf-5.html

3.1.3 设置用户名和密码

将配置文件中 #allow_anonymous true 去掉注释,设置为 false#password_file 去掉注释并添加密码文件保存的位置:

allow_anonymous false
password_file /etc/mosquitto/pwfile.example
mosquitto_passwd -c /etc/mosquitto/pwfile.example 用户名
之后需输入两次密码
注意如果想添加用户
mosquitto_passwd -b /etc/mosquitto/pwfile.example 用户名 密码

同样连续会提示连续输入两次密码。注意第二次创建用户时不用加 -c 如果加 -c 会把第一次创建的用户覆盖。

3.1.4 启动 mosquitto

mosquitto -c /etc/mosquitto/mosquitto.conf -d

成功将启动并监听 1883 端口

3.2 测试

新建两个 shell 窗口 A/B

A 订阅主题:

mosquitto_sub -t 主题名 -h 主机IP -u 用户名 -P 密码
例如:mosquitto_sub -t topic-riemann -h localhost -u mosquitto -P mosquitto

B 推送消息:

mosquitto_pub -t 主题名 -h 主机IP -m "消息内容" -u 用户名 -P 密码
例如:mosquitto_pub -t topic-riemann -h localhost -m "hello,mqtt" -u mosquitto -P mosquitto

3.3 可能遇到的问题

如果你出现这个错误:

mosquitto_sub: error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory

解决方法:

编译完 mosquitto 之后,进入到 lib 目录下,将编译之后的 libmosquitto.so.1 拷贝到目录 /usr/local/lib下,执行如下命令:

cp libmosquitto.so.1 /usr/local/lib

然后再执行命令:

sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
ldconfig

3.4 测试结果


四、Java 实现 Mosquitto 客户端

4.1 项目结构图

4.2 添加 pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.6.RELEASE</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
        <version>5.2.5.RELEASE</version>
    </dependency>
</dependencies>

4.3 application.yml

mqtt:
  host: tcp://服务器IP:1883
  clientId: client_${random.value}
  topic: test/system/module/biz
  qoslevel: 1
  username: mosquitto
  password: mosquitto
  timeout: 10000
  keepalive: 20
server:
  port: 8888

4.4 MqttConfig

/**
 * @author: 微信公众号【老周聊架构】
 */
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password}")
    private String password;
    @Value("${mqtt.host}")
    private String hostUrl;
    @Value("${mqtt.clientId}")
    private String clientId;
    @Value("${mqtt.topic}")
    private String defaultTopic;
    // 连接超时
    @Value("${mqtt.timeout}")
    private int completionTimeout;
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }
    @Bean
    public MessageChannel mqttOutboundChannel() {
        DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }
    // 接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }
    // 配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound",
                mqttClientFactory(), "test/#");
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    // 通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = (String)message.getHeaders().get("mqtt_receivedTopic");
            log.info("主题:{},消息接收到的数据:{}", topic, message.getPayload());
        };
    }
}

4.5 MqttGateWay

/**
 * @author: 微信公众号【老周聊架构】
 */
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateWay {
    // 定义重载方法,用于消息发送
    void sendToMqtt(String payload);
    // 指定topic进行消息发送
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

4.6 MqttController 控制类

/**
 * @author: 微信公众号【老周聊架构】
 */
@Slf4j
@RestController
@RequestMapping("/api")
public class MqttController {
    @Autowired
    MqttGateWay mqttGateWay;
    @PostMapping("/publish")
    public String publish(@RequestHeader(value = "toplic") String toplic , String message) {
        log.info(String.format("topic: %s, message: %s", toplic, message));
        mqttGateWay.sendToMqtt(toplic, message);
        return "success";
    }
}

4.7 MqttApplication 启动类

/**
 * @author: 微信公众号【老周聊架构】
 */
@SpringBootApplication
public class MqttApplication {
    public static void main(String[] args) {
        SpringApplication.run(MqttApplication.class, args);
    }
}

4.8 启动 mosquitto 服务器

mosquitto -c /etc/mosquitto/mosquitto.conf -d

4.9 利用 IDEA 的 HTTP Client 模拟 HTTP 请求



4.10 测试结果

IDEA 控制台接收到该主题的消息:


shell 终端显示也收到了订阅了该主题的消息:



欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。


喜欢的话,点赞、再看、分享三连。

相关实践学习
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
13天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
152 1
|
13天前
|
消息中间件 安全 物联网
海量接入、毫秒响应:易易互联基于 Apache RocketMQ + MQTT 构筑高可用物联网消息中枢
易易互联科技有限公司是吉利集团旗下专注于换电生态的全资子公司,致力于打造安全、便捷、便宜的智能换电网络。公司依托吉利GBRC换电平台,基于电池共享与车辆全生命周期运营,已布局超470座换电站,覆盖40多个城市,计划2027年达2000座。面对海量设备高并发连接、高实时性要求及数据洪峰挑战,易易互联采用阿里云MQTT与RocketMQ构建高效物联网通信架构,实现稳定接入、低延迟通信与弹性处理,全面支撑其全国换电网络规模化运营与智能化升级。
海量接入、毫秒响应:易易互联基于 Apache RocketMQ + MQTT 构筑高可用物联网消息中枢
|
7天前
|
数据采集 传感器 监控
Modbus 与 MQTT 协议兼容:MyEMS 的泛在能源数据采集技术实现
MyEMS深度融合Modbus与MQTT协议,破解能源数据采集中协议碎片化、网络异构、数据孤岛等难题。通过Modbus接入95%以上工业设备,实现现场数据精准“拉取”;依托MQTT构建高效物联网传输通道,支持多源数据主动“推送”与云端集成。边缘侧采集规整,中心侧汇聚分析,形成统一、可靠、低延迟的数据流。该架构兼具高兼容性、强扩展性与低运维成本,广泛应用于工业园区、商业楼宇及集团型企业,支撑实时监控、AI分析与跨系统融合,打造泛在互联的能源数据底座,助力企业实现全面智慧能源管理。
59 6
|
4月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
2331 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
5月前
|
物联网
(手把手)在华为云、阿里云搭建自己的物联网MQTT消息服务器,免费IOT平台
本文介绍如何在阿里云搭建自己的物联网MQTT消息服务器,并使用 “MQTT客户端调试工具”模拟MQTT设备,接入平台进行消息收发。
1885 42
|
6月前
|
数据采集 监控 网络协议
​MCP协议深度解析:原理、应用与物联网时代的机遇-优雅草卓伊凡
​MCP协议深度解析:原理、应用与物联网时代的机遇-优雅草卓伊凡
567 40
​MCP协议深度解析:原理、应用与物联网时代的机遇-优雅草卓伊凡
|
5月前
|
物联网
如何在腾讯云等平台搭建自己的物联网MQTT服务器Broker
物联网技术及MQTT协议被广泛应用于各种场景。本文介绍物联网MQTT服务助手下载,如何搭建自己的物联网平台,并使用 “MQTT客户端调试工具”模拟MQTT设备,接入平台进行消息收发。
407 37
|
4月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
574 0
|
6月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
|
3月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。

热门文章

最新文章