【开源视频联动物联网平台】vertx写一个mqtt服务端

简介: 【开源视频联动物联网平台】vertx写一个mqtt服务端
/**
 * VertxMqttServer: 一个基于Vert.x实现的MQTT服务器。
 * 该服务器支持设备连接、认证、消息发布和订阅功能。
 */
package com.mz.network.mqtt;
 
import com.alibaba.fastjson.JSON;
import com.mz.network.authority.DeviceAuthorityService;
import com.mz.network.client.Client;
import com.mz.network.client.ClientRepository;
import com.mz.network.client.message.ClientMessage;
import com.mz.network.core.Topics;
import com.mz.network.events.CommandReplyEvent;
import com.mz.network.events.DeviceReportEvent;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.*;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
@Slf4j
public class VertxMqttServer extends AbstractVerticle {
 
    @Autowired
    private ClientRepository clientRepository;
 
    @Value("${vertx.service-id}")
    private String serviceId;
 
    @Autowired
    private ApplicationEventPublisher eventPublisher;
 
    @Autowired
    private MqttServerOptions mqttServerOptions;
 
    @Autowired
    private DeviceAuthorityService authorityService;
 
    // 用于存储订阅关系的映射
    private final Map<String, MqttEndpoint> subscribers = new ConcurrentHashMap<>();
 
    /**
     * 重写AbstractVerticle的start方法,实现MQTT服务器的启动和初始化。
     * @throws Exception 启动过程中可能抛出的异常
     */
    @Override
    public void start() throws Exception {
        // 创建MQTT服务器实例
        MqttServer mqttServer = MqttServer.create(vertx, mqttServerOptions);
        // 处理MQTT客户端连接请求
        mqttServer.endpointHandler(mqttEndpoint -> {
            String clientId = mqttEndpoint.clientIdentifier();
            log.debug("接收到MQTT客户端[{}]消息", clientId);
            // 执行创建连接
            doConnect(mqttEndpoint);
        }).listen(result -> {
            if (result.succeeded()) {
                int port = mqttServer.actualPort();
                log.debug("MQTT server started on port {}", port);
            } else {
                log.warn("MQTT server start failed", result.cause());
            }
        });
    }
 
    /**
     * 处理MQTT客户端连接请求。
     * @param endpoint MqttEndpoint实例,代表与客户端的连接
     */
    protected void doConnect(MqttEndpoint endpoint) {
        if (endpoint.auth() == null) {
            endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
            return;
        }
        String userName = endpoint.auth().userName();
        String passWord = endpoint.auth().password();
 
        if (authorityService.verification(endpoint.clientIdentifier(), userName, passWord)) {
            log.debug("MQTT客户端:{}认证通过", endpoint.clientIdentifier());
            acceptConnect(endpoint);
        } else {
            log.warn("客户端[{}]用户名密码错误", endpoint.clientIdentifier());
            endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        }
    }
 
    /**
     * 接受MQTT客户端连接,并进行订阅处理。
     * @param endpoint MqttEndpoint实例,代表与客户端的连接
     */
    protected void acceptConnect(MqttEndpoint endpoint) {
        String clientId = endpoint.clientIdentifier();
        MqttClienttest client = new MqttClienttest(endpoint);
 
        endpoint.accept(false)
                .closeHandler(v -> {
                    log.debug("[{}] closed", clientId);
                    Client old = clientRepository.getClient(clientId);
                    if (old == client) {
                        clientRepository.unregister(clientId);
                    } else {
                        log.debug("client {} is unregistered", client);
                    }
                })
                .subscribeHandler(subscribe -> {
                    List<MqttQoS> grantedQosLevels = new ArrayList<>();
                    for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
                        log.info("[{}] Subscription for {} with QoS {}", clientId, s.topicName(), s.qualityOfService());
                        grantedQosLevels.add(s.qualityOfService());
                        subscribers.put(s.topicName(), endpoint);
                    }
                    // ack the subscriptions request
                    endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
 
                    // specifing handlers for handling QoS 1 and 2
                    endpoint.publishAcknowledgeHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId))
                            .publishReceivedHandler(endpoint::publishRelease)
                            .publishCompletionHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId));
 
                })
                .unsubscribeHandler(unsubscribe -> {
                    unsubscribe.topics().forEach(topicName -> {
                        log.info("[{}] Unsubscription for {}", clientId, topicName);
                        // Remove the endpoint when a client unsubscribes
                        subscribers.remove(topicName);
                    });
                    // ack the subscriptions request
                    endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
                })
                .disconnectHandler(v -> log.info("[{}] Received disconnect from client", clientId))
                .exceptionHandler(e -> log.error(clientId, e))
                .publishHandler(message -> {
                    //设备推送了消息
                    String topicName = message.topicName();
                    Buffer buffer = message.payload();
                    String payload = buffer.toString();
 
                    log.info("接受到客户端消息推送:[{}] payload [{}] with QoS [{}]", topicName, payload, message.qosLevel());
                    if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                        endpoint.publishAcknowledge(message.messageId());
                    } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
                        endpoint.publishReceived(message.messageId());
                    }
 
                    //往订阅的客户端发送消息
                    handlePublish(message);
                    //平台物模型处理
                    try {
                        ClientMessage event = null;
                        //目前仅支持reply和report的topic
                        if (Topics.reply.equals(topicName)) {
                            event = JSON.parseObject(payload, CommandReplyEvent.class);
                        } else if (Topics.report.equals(topicName)) {
                            event = JSON.parseObject(payload, DeviceReportEvent.class);
                        }
                        if (null != event) {
                            event.setClientId(clientId);
                            //发布事件到spring
                            eventPublisher.publishEvent(event);
                        } else {
                            log.warn("不支持的topic:{} => {}", topicName, payload);
                        }
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                    }
                })
                .publishReleaseHandler(messageId -> {
                    log.debug("complete message :{}", messageId);
                    endpoint.publishComplete(messageId);
                });
        //注册设备
        clientRepository.register(client);
    }
 
    /**
     * 处理MQTT消息的发布,包括订阅关系的转发。
     * @param message MqttPublishMessage实例,代表接收到的MQTT消息
     */
    private void handlePublish(MqttPublishMessage message) {
        // Handle incoming publish messages
        String topic = message.topicName();
        String payload = message.payload().toString();
        int qos = message.qosLevel().value();
 
        System.out.println("Received message on [" + topic + "] payload [" + payload + "] with QoS " + qos);
 
        // Forward the message to subscribers matching the topic
        subscribers.forEach((subscribedTopic, subscriberEndpoint) -> {
            if (topicMatches(subscribedTopic, topic)) {
                // Handle different QoS levels
                switch (qos) {
                    case 0:
                        // QoS 0: At most once delivery, no acknowledgment needed
                        subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false);
                        System.out.println("Message forwarded to [" + subscribedTopic + "] with QoS 0");
                        break;
                    case 1:
                    case 2:
                        // QoS 1 and QoS 2: Acknowledgment needed
                        subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false,
                            publishResult -> handlePublishResult(publishResult, subscribedTopic, qos));
                        break;
                    default:
                        System.err.println("Unsupported QoS level: " + qos);
                }
            }
        });
    }
 
    /**
     * 处理MQTT消息发布的结果。
     * @param publishResult AsyncResult实例,代表MQTT消息发布的异步结果
     * @param topic 订阅主题
     * @param qos QoS级别
     */
    private void handlePublishResult(AsyncResult<Integer> publishResult, String topic, int qos) {
        if (publishResult.succeeded()) {
            System.out.println("Message forwarded to subscribers of [" + topic + "] with QoS " + qos);
        } else {
            System.err.println("Failed to forward message to [" + topic + "] with QoS " + qos +
                ": " + publishResult.cause().getMessage());
        }
    }
 
    /**
     * 判断订阅主题是否匹配实际主题。
     * @param subscribedTopic 订阅主题
     * @param actualTopic 实际主题
     * @return 是否匹配
     */
    private boolean topicMatches(String subscribedTopic, String actualTopic) {
        String[] subscribedParts = subscribedTopic.split("/");
        String[] actualParts = actualTopic.split("/");
 
        if (subscribedParts.length != actualParts.length) {
            return false;
        }
 
        for (int i = 0; i < subscribedParts.length; i++) {
            if (!subscribedParts[i].equals("+") && !subscribedParts[i].equals(actualParts[i])) {
                return false;
            }
        }
 
        return true;
    }
}
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
23天前
|
监控 网络协议 物联网
你知道什么是物联网MQTT么?
你知道什么是物联网MQTT么?
29 0
|
2月前
|
JavaScript 小程序 前端开发
【手把手教教学物联网项目】01 视频大纲
《手把手教教学物联网项目》是一系列视频教程,旨在引导初学者掌握物联网技术。视频涵盖物联网基础,如物联网概述、架构和技术;STM32微控制器的介绍、编程及外设使用;网关开发,涉及ESP8266和ESP32;物联网通信协议如TCP、MQTT、Modbus等;物联网总线协议如单总线、CAN、IIC和SPI;OLED显示原理与驱动;MQTT服务器搭建;物联网云平台介绍,包括阿里云平台的使用;微信小程序开发入门及前端VUE项目实践。此外,教程还涉及UniAPP和SpringBoot后台开发,最后通过“智能取餐柜”项目将理论知识付诸实践。视频可在B站找到,适合学生、爱好者和开发人员学习物联网技术。
113 12
【手把手教教学物联网项目】01 视频大纲
|
2月前
|
消息中间件 网络协议 物联网
MQTT常见问题之物联网设备端申请动态注册时MQTT服务不可用如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
XML 编解码 JSON
【开源视频联动物联网平台】协议包管理
【开源视频联动物联网平台】协议包管理
63 1
|
2月前
|
传感器 边缘计算 物联网
基于ELF 1S开发板完成的物联网开源
项目包含云、网、边、端四部分,采用涂鸦云作为云服务器,便于初学者接入。ELF 1S开发板作为边缘中控,运行Linux+Qt,通过Wi-Fi连接云服务器。开发板通过USB无线模块与端侧设备通信,支持AT指令和功能扩展。项目提供5个Qt应用界面,包括电器控制、环境监测、云服务器连接、有线网络和参数设置,可与手机APP交互。端侧设备包括Modbus-RTU从机和无线网络模块。整个项目已在Gitee开源。
48 4
|
16天前
|
物联网
好的资源链接,gitee全糖咖啡,B站视频转成mp4,全糖咖啡 / 物联网网关数据上传,,全糖咖啡 / springboot+百度智能车牌检测
好的资源链接,gitee全糖咖啡,B站视频转成mp4,全糖咖啡 / 物联网网关数据上传,,全糖咖啡 / springboot+百度智能车牌检测
|
2月前
|
存储 关系型数据库 物联网
【PolarDB开源】PolarDB在物联网(IoT)数据存储中的应用探索
【5月更文挑战第27天】PolarDB,阿里云的高性能云数据库,针对物联网(IoT)数据存储的挑战,如大规模数据、实时性及多样性,展现出高扩展性、高性能和高可靠性。它采用分布式架构,支持动态扩展,保证99.95%的高可用性,并能处理结构化、半结构化和非结构化数据。通过SDK实现数据实时写入,支持SQL查询和冷热数据分层,有效降低成本。随着IoT发展,PolarDB在该领域的应用将更加广泛。
147 1
|
1月前
|
传感器 物联网
物联网协议概述:MQTT、CoAP 和 HTTP
【6月更文挑战第3天】探索物联网的三大协议——MQTT、CoAP 和 HTTP。MQTT 是高效的消息传递使者,适用于大规模、不稳定网络环境;CoAP 小巧灵活,适合资源有限的设备;HTTP 则是熟悉的网络通信老将。根据不同场景选择合适的协议,让物联网设备有效交流。示例代码展示它们的使用方式。
64 0
|
2月前
|
JSON 缓存 物联网
推荐一款go语言的开源物联网框架-opengw
推荐一款go语言的开源物联网框架-opengw
78 4
|
3天前
|
供应链 监控 物联网
未来已来:探索区块链、物联网与虚拟现实技术的融合革新
【7月更文挑战第18天】 在数字技术不断演进的今天,区块链、物联网(IoT)和虚拟现实(VR)等新兴技术正逐渐改变我们的工作和生活方式。本文旨在深入探讨这些技术的独特发展趋势及其在多个行业中的创新应用。我们将看到,随着技术的成熟和应用场景的拓展,这些领域不仅独立发展,更在相互融合中催生出新的增长点和解决方案。