【开源视频联动物联网平台】vertx写一个mqtt服务端

简介: 【开源视频联动物联网平台】vertx写一个mqtt服务端
/**
 * VertxMqttServer: 一个基于Vert.x实现的MQTT服务器。
 * 该服务器支持设备连接、认证、消息发布和订阅功能。
 */
package com.mz.network.mqtt;
 
import com.alibaba.fastjson.JSON;
import com.mz.network.authority.DeviceAuthorityService;
import com.mz.network.client.Client;
import com.mz.network.client.ClientRepository;
import com.mz.network.client.message.ClientMessage;
import com.mz.network.core.Topics;
import com.mz.network.events.CommandReplyEvent;
import com.mz.network.events.DeviceReportEvent;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.*;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
@Slf4j
public class VertxMqttServer extends AbstractVerticle {
 
    @Autowired
    private ClientRepository clientRepository;
 
    @Value("${vertx.service-id}")
    private String serviceId;
 
    @Autowired
    private ApplicationEventPublisher eventPublisher;
 
    @Autowired
    private MqttServerOptions mqttServerOptions;
 
    @Autowired
    private DeviceAuthorityService authorityService;
 
    // 用于存储订阅关系的映射
    private final Map<String, MqttEndpoint> subscribers = new ConcurrentHashMap<>();
 
    /**
     * 重写AbstractVerticle的start方法,实现MQTT服务器的启动和初始化。
     * @throws Exception 启动过程中可能抛出的异常
     */
    @Override
    public void start() throws Exception {
        // 创建MQTT服务器实例
        MqttServer mqttServer = MqttServer.create(vertx, mqttServerOptions);
        // 处理MQTT客户端连接请求
        mqttServer.endpointHandler(mqttEndpoint -> {
            String clientId = mqttEndpoint.clientIdentifier();
            log.debug("接收到MQTT客户端[{}]消息", clientId);
            // 执行创建连接
            doConnect(mqttEndpoint);
        }).listen(result -> {
            if (result.succeeded()) {
                int port = mqttServer.actualPort();
                log.debug("MQTT server started on port {}", port);
            } else {
                log.warn("MQTT server start failed", result.cause());
            }
        });
    }
 
    /**
     * 处理MQTT客户端连接请求。
     * @param endpoint MqttEndpoint实例,代表与客户端的连接
     */
    protected void doConnect(MqttEndpoint endpoint) {
        if (endpoint.auth() == null) {
            endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
            return;
        }
        String userName = endpoint.auth().userName();
        String passWord = endpoint.auth().password();
 
        if (authorityService.verification(endpoint.clientIdentifier(), userName, passWord)) {
            log.debug("MQTT客户端:{}认证通过", endpoint.clientIdentifier());
            acceptConnect(endpoint);
        } else {
            log.warn("客户端[{}]用户名密码错误", endpoint.clientIdentifier());
            endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        }
    }
 
    /**
     * 接受MQTT客户端连接,并进行订阅处理。
     * @param endpoint MqttEndpoint实例,代表与客户端的连接
     */
    protected void acceptConnect(MqttEndpoint endpoint) {
        String clientId = endpoint.clientIdentifier();
        MqttClienttest client = new MqttClienttest(endpoint);
 
        endpoint.accept(false)
                .closeHandler(v -> {
                    log.debug("[{}] closed", clientId);
                    Client old = clientRepository.getClient(clientId);
                    if (old == client) {
                        clientRepository.unregister(clientId);
                    } else {
                        log.debug("client {} is unregistered", client);
                    }
                })
                .subscribeHandler(subscribe -> {
                    List<MqttQoS> grantedQosLevels = new ArrayList<>();
                    for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
                        log.info("[{}] Subscription for {} with QoS {}", clientId, s.topicName(), s.qualityOfService());
                        grantedQosLevels.add(s.qualityOfService());
                        subscribers.put(s.topicName(), endpoint);
                    }
                    // ack the subscriptions request
                    endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
 
                    // specifing handlers for handling QoS 1 and 2
                    endpoint.publishAcknowledgeHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId))
                            .publishReceivedHandler(endpoint::publishRelease)
                            .publishCompletionHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId));
 
                })
                .unsubscribeHandler(unsubscribe -> {
                    unsubscribe.topics().forEach(topicName -> {
                        log.info("[{}] Unsubscription for {}", clientId, topicName);
                        // Remove the endpoint when a client unsubscribes
                        subscribers.remove(topicName);
                    });
                    // ack the subscriptions request
                    endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
                })
                .disconnectHandler(v -> log.info("[{}] Received disconnect from client", clientId))
                .exceptionHandler(e -> log.error(clientId, e))
                .publishHandler(message -> {
                    //设备推送了消息
                    String topicName = message.topicName();
                    Buffer buffer = message.payload();
                    String payload = buffer.toString();
 
                    log.info("接受到客户端消息推送:[{}] payload [{}] with QoS [{}]", topicName, payload, message.qosLevel());
                    if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                        endpoint.publishAcknowledge(message.messageId());
                    } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
                        endpoint.publishReceived(message.messageId());
                    }
 
                    //往订阅的客户端发送消息
                    handlePublish(message);
                    //平台物模型处理
                    try {
                        ClientMessage event = null;
                        //目前仅支持reply和report的topic
                        if (Topics.reply.equals(topicName)) {
                            event = JSON.parseObject(payload, CommandReplyEvent.class);
                        } else if (Topics.report.equals(topicName)) {
                            event = JSON.parseObject(payload, DeviceReportEvent.class);
                        }
                        if (null != event) {
                            event.setClientId(clientId);
                            //发布事件到spring
                            eventPublisher.publishEvent(event);
                        } else {
                            log.warn("不支持的topic:{} => {}", topicName, payload);
                        }
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                    }
                })
                .publishReleaseHandler(messageId -> {
                    log.debug("complete message :{}", messageId);
                    endpoint.publishComplete(messageId);
                });
        //注册设备
        clientRepository.register(client);
    }
 
    /**
     * 处理MQTT消息的发布,包括订阅关系的转发。
     * @param message MqttPublishMessage实例,代表接收到的MQTT消息
     */
    private void handlePublish(MqttPublishMessage message) {
        // Handle incoming publish messages
        String topic = message.topicName();
        String payload = message.payload().toString();
        int qos = message.qosLevel().value();
 
        System.out.println("Received message on [" + topic + "] payload [" + payload + "] with QoS " + qos);
 
        // Forward the message to subscribers matching the topic
        subscribers.forEach((subscribedTopic, subscriberEndpoint) -> {
            if (topicMatches(subscribedTopic, topic)) {
                // Handle different QoS levels
                switch (qos) {
                    case 0:
                        // QoS 0: At most once delivery, no acknowledgment needed
                        subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false);
                        System.out.println("Message forwarded to [" + subscribedTopic + "] with QoS 0");
                        break;
                    case 1:
                    case 2:
                        // QoS 1 and QoS 2: Acknowledgment needed
                        subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false,
                            publishResult -> handlePublishResult(publishResult, subscribedTopic, qos));
                        break;
                    default:
                        System.err.println("Unsupported QoS level: " + qos);
                }
            }
        });
    }
 
    /**
     * 处理MQTT消息发布的结果。
     * @param publishResult AsyncResult实例,代表MQTT消息发布的异步结果
     * @param topic 订阅主题
     * @param qos QoS级别
     */
    private void handlePublishResult(AsyncResult<Integer> publishResult, String topic, int qos) {
        if (publishResult.succeeded()) {
            System.out.println("Message forwarded to subscribers of [" + topic + "] with QoS " + qos);
        } else {
            System.err.println("Failed to forward message to [" + topic + "] with QoS " + qos +
                ": " + publishResult.cause().getMessage());
        }
    }
 
    /**
     * 判断订阅主题是否匹配实际主题。
     * @param subscribedTopic 订阅主题
     * @param actualTopic 实际主题
     * @return 是否匹配
     */
    private boolean topicMatches(String subscribedTopic, String actualTopic) {
        String[] subscribedParts = subscribedTopic.split("/");
        String[] actualParts = actualTopic.split("/");
 
        if (subscribedParts.length != actualParts.length) {
            return false;
        }
 
        for (int i = 0; i < subscribedParts.length; i++) {
            if (!subscribedParts[i].equals("+") && !subscribedParts[i].equals(actualParts[i])) {
                return false;
            }
        }
 
        return true;
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
监控 网络协议 物联网
你知道什么是物联网MQTT么?
你知道什么是物联网MQTT么?
75 0
|
12天前
|
网络协议 物联网 网络性能优化
物联网协议比较 MQTT CoAP RESTful/HTTP XMPP
【10月更文挑战第18天】本文介绍了物联网领域中四种主要的通信协议:MQTT、CoAP、RESTful/HTTP和XMPP,分别从其特点、应用场景及优缺点进行了详细对比,并提供了简单的示例代码。适合开发者根据具体需求选择合适的协议。
36 5
|
16天前
|
存储 JSON Ubuntu
时序数据库 TDengine 支持集成开源的物联网平台 ThingsBoard
本文介绍了如何结合 Thingsboard 和 TDengine 实现设备管理和数据存储。Thingsboard 中的“设备配置”与 TDengine 中的超级表相对应,每个设备对应一个子表。通过创建设备配置和设备,实现数据的自动存储和管理。具体操作包括创建设备配置、添加设备、写入数据,并展示了车辆实时定位追踪和车队维护预警两个应用场景。
39 3
|
2月前
|
网络协议 物联网 网络性能优化
物联网江湖风云变幻!MQTT CoAP RESTful/HTTP XMPP四大门派谁主沉浮?
【9月更文挑战第3天】物联网(IoT)的兴起催生了多种通信协议,如MQTT、CoAP、RESTful/HTTP和XMPP,各自适用于不同场景。本文将对比这些协议的特点、优缺点,并提供示例代码。MQTT轻量级且支持QoS,适合大规模部署;CoAP基于UDP,适用于低功耗网络;RESTful/HTTP易于集成但不适合资源受限设备;XMPP支持双向通信,适合复杂交互应用。通过本文,开发者可更好地选择合适的物联网通信协议。
37 2
|
3月前
|
网络协议 物联网 网络性能优化
物联网江湖风云变幻!MQTT CoAP RESTful/HTTP XMPP四大门派谁主沉浮?
【8月更文挑战第14天】本文概览了MQTT、CoAP、RESTful/HTTP及XMPP四种物联网通信协议。MQTT采用发布/订阅模式,轻量高效;CoAP针对资源受限设备,基于UDP,低延迟;RESTful/HTTP易于集成现有Web基础设施;XMPP支持双向通信,扩展性强。每种协议均附有示例代码,助您根据不同场景和设备特性作出最佳选择。
35 5
|
4月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
160 12
|
3月前
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
94 0
|
3月前
|
物联网 网络性能优化 Python
"掌握MQTT协议,开启物联网通信新篇章——揭秘轻量级消息传输背后的力量!"
【8月更文挑战第21天】MQTT是一种轻量级的消息传输协议,以其低功耗、低带宽的特点在物联网和移动应用领域广泛应用。基于发布/订阅模型,MQTT支持三种服务质量级别,非常适合受限网络环境。本文详细阐述了MQTT的工作原理及特点,并提供了使用Python `paho-mqtt`库实现的发布与订阅示例代码,帮助读者快速掌握MQTT的应用技巧。
69 0
|
4月前
|
消息中间件 物联网 API
消息队列 MQ使用问题之如何在物联网项目中搭配使用 MQTT、AMQP 与 RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。