MQTT协议

简介: mqtt介绍及应用

介绍

MQTT是一种机器对机器 (M2M) 的物联网连接协议。它被设计为一个极其轻量级的发布和订阅消息传输。它对于与需要少量代码和/或网络带宽非常宝贵的远程位置的连接非常有用。

每个 MQTT 客户端订阅某些主题并在发布者开始推送这些主题的消息时接收消息。

MQTT协议实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

如何向外扩展

水平扩展的目的是在同一应用程序的多个实例之间分配负载。如果这些实例中的 MQTT 客户端订阅了相同的主题,那么相同的 MQTT 消息将被传递到每个实例,这不是预期的。
image.png

安装MQTT服务(EMQ为例)

  • 获取 Docker 镜像
docker pull emqx/emqx:4.3.7
  • 启动 Docker 容器
docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:4.3.7
  • 启动

当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,通过访问
http://localhost:18083 来查看Dashboard,默认用户名是 admin ,密码是 public 。
image.png

安装MQTT客户端

image.png
https://gitee.com/emqx/MQTTX

image.png

整合springboot

image.png

  • 添加maven
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
            <scope>compile</scope>
        </dependency>
  • 配置文件


mqtt:
  inbound:
    url: tcp://bs-emqx:1883
    clientId: bs:iot:mqtt:client:subscribe
    topics: $queue/mqtt/face/#,mqtt/Ack/#,mqtt/HeartBeat,mqtt/WillTopic,mqtt/DataStreamAck/#,mqtt/CaptureEvent/#
  outbound:
    url: tcp://xxx:1883
    username: xxx
    password: xxx
    clientId: xxx
    topic: mqtt/face/#
  • 消息订阅配置


import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.bszn.iot.mqtt.handler.BdForwardHandler;
import com.bszn.iot.mqtt.handler.ForwardHandler;
import com.bszn.iot.mqtt.model.PushPayload;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;

/**
 * @Auther: gc.x
 * @Date: 2020/6/11 9:27
 * @Description: 消息订阅配置
 */
@Component
@Configuration
@Slf4j
public class MqttInboundConfiguration {
    @Autowired
    private MqttProperties mqttProperties;
    @Autowired
    private ForwardHandler forwardHandler;
    @Autowired
    private BdForwardHandler bdForwardHandler;
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }




    /**
     * MQTT 消息订阅绑定(消费者)
     * @param mqttPahoClientFactory
     * @return
     */
    @Bean
    public MessageProducer inbound(MqttPahoClientFactory mqttPahoClientFactory) {
        String[] inboundTopics = mqttProperties.getInbound().getTopics().split(",");
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInbound().getClientId(),
                        mqttPahoClientFactory,inboundTopics);
        log.info("client:{} 绑定消费者", mqttProperties.getInbound().getClientId());
        adapter.setCompletionTimeout(5000);
        // 配置默认Paho消息转换器(qos=0, retain=false, charset=UTF-8)
        adapter.setConverter(new DefaultPahoMessageConverter());

        /*
         * 设置Qos等级
         * level 0:最多一次的传输
         * level 1:至少一次的传输
         * level 2: 有且只有一次传输 保证消息投递成功
         */

        adapter.setQos(2);
        //设置订阅通道
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    //ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            try{
                String vendorName = getVendorName(message.getPayload().toString());
                // 入口 分发不同消息
                switch (vendorName){
                    case "hqvt":
                        PushPayload<JSONObject> pushPayload = JSONObject.parseObject(message.getPayload().toString(), new TypeReference<PushPayload<JSONObject>>(){});
                        this.forwardHandler.receive(pushPayload);
                        break;
                    case "bigdragon":
                        JSONObject jsonObject = JSONObject.parseObject(message.getPayload().toString());
                        this.bdForwardHandler.receive(jsonObject);
                        break;



                }
            }catch (Exception e){
                log.error("MQTT's message parse JSONObject fail:{}", e.getMessage());
            }
        };
    }

    private String getVendorName(String message){
        JSONObject jsonObject = JSONObject.parseObject(message);
        if(StringUtils.isNotBlank(jsonObject.getString("operator"))){
            return "hqvt";
        }else if(StringUtils.isNotBlank(jsonObject.getString("Action"))){
            return "bigdragon";
        }
        return "";
    }



}
  • 消息发布配置


import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * @Auther: gc.x
 * @Date: 2020/6/11 9:34
 * @Description: 消息发送
 */
@Configuration
@Slf4j
public class MqttOutboundConfiguration {
    @Autowired
    private MqttProperties mqttProperties;


    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        // 这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        // 设置连接的用户名
        options.setUserName(mqttProperties.getOutbound().getUsername());
        // 设置连接的密码
        options.setPassword(mqttProperties.getOutbound().getPassword().toCharArray());
        options.setServerURIs(mqttProperties.getOutbound().getUrl().split(","));
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        options.setMaxInflight(1000);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //options.setWill("willTopic", WILL_DATA, 2, false);
        return options;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(), mqttClientFactory());
        //BsMqttPahoMessageHandler messageHandler = new BsMqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(), mqttClientFactory());
        log.info("client:{} 绑定生产者者", mqttProperties.getOutbound().getClientId());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getOutbound().getTopic());
        //messageHandler.onInit();
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 物联网 网络性能优化
MQTT常见问题之MQTT不支持5.0的协议如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
13天前
|
网络协议 物联网 网络性能优化
物联网协议比较 MQTT CoAP RESTful/HTTP XMPP
【10月更文挑战第18天】本文介绍了物联网领域中四种主要的通信协议:MQTT、CoAP、RESTful/HTTP和XMPP,分别从其特点、应用场景及优缺点进行了详细对比,并提供了简单的示例代码。适合开发者根据具体需求选择合适的协议。
38 5
|
6月前
|
传感器 网络协议 Ubuntu
MQTT协议与EMQ
MQTT协议与EMQ
160 0
|
2月前
|
消息中间件 监控 物联网
MQTT协议对接及RabbitMQ的使用记录
通过合理对接MQTT协议并利用RabbitMQ的强大功能,可以构建一个高效、可靠的消息通信系统。无论是物联网设备间的通信还是微服务架构下的服务间消息传递,MQTT和RabbitMQ的组合都提供了一个强有力的解决方案。在实际应用中,应根据具体需求和环境进行适当的配置和优化,以发挥出这两个技术的最大效能。
157 0
|
3月前
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
103 0
|
3月前
|
物联网 网络性能优化 Python
"掌握MQTT协议,开启物联网通信新篇章——揭秘轻量级消息传输背后的力量!"
【8月更文挑战第21天】MQTT是一种轻量级的消息传输协议,以其低功耗、低带宽的特点在物联网和移动应用领域广泛应用。基于发布/订阅模型,MQTT支持三种服务质量级别,非常适合受限网络环境。本文详细阐述了MQTT的工作原理及特点,并提供了使用Python `paho-mqtt`库实现的发布与订阅示例代码,帮助读者快速掌握MQTT的应用技巧。
78 0
|
5月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
142 1
|
5月前
|
消息中间件 存储 RocketMQ
消息队列 MQ产品使用合集之Remoting协议是否可以直接和proxy交互的吗
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
5月前
|
消息中间件 Serverless Windows
消息队列 MQ产品使用合集之MQTT协议是否可以应用于社交软件的系统通知场景
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
5月前
|
传感器 物联网
物联网协议概述:MQTT、CoAP 和 HTTP
【6月更文挑战第3天】探索物联网的三大协议——MQTT、CoAP 和 HTTP。MQTT 是高效的消息传递使者,适用于大规模、不稳定网络环境;CoAP 小巧灵活,适合资源有限的设备;HTTP 则是熟悉的网络通信老将。根据不同场景选择合适的协议,让物联网设备有效交流。示例代码展示它们的使用方式。
162 0

热门文章

最新文章