【开源视频联动物联网平台】vertx写一个mqtt服务端

简介: 【开源视频联动物联网平台】vertx写一个mqtt服务端
/**
 * VertxMqttServer: 一个基于Vert.x实现的MQTT服务器。
 * 该服务器支持设备连接、认证、消息发布和订阅功能。
 */
package com.mz.network.mqtt;
 
import com.alibaba.fastjson.JSON;
import com.mz.network.authority.DeviceAuthorityService;
import com.mz.network.client.Client;
import com.mz.network.client.ClientRepository;
import com.mz.network.client.message.ClientMessage;
import com.mz.network.core.Topics;
import com.mz.network.events.CommandReplyEvent;
import com.mz.network.events.DeviceReportEvent;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.*;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
@Slf4j
public class VertxMqttServer extends AbstractVerticle {
 
    @Autowired
    private ClientRepository clientRepository;
 
    @Value("${vertx.service-id}")
    private String serviceId;
 
    @Autowired
    private ApplicationEventPublisher eventPublisher;
 
    @Autowired
    private MqttServerOptions mqttServerOptions;
 
    @Autowired
    private DeviceAuthorityService authorityService;
 
    // 用于存储订阅关系的映射
    private final Map<String, MqttEndpoint> subscribers = new ConcurrentHashMap<>();
 
    /**
     * 重写AbstractVerticle的start方法,实现MQTT服务器的启动和初始化。
     * @throws Exception 启动过程中可能抛出的异常
     */
    @Override
    public void start() throws Exception {
        // 创建MQTT服务器实例
        MqttServer mqttServer = MqttServer.create(vertx, mqttServerOptions);
        // 处理MQTT客户端连接请求
        mqttServer.endpointHandler(mqttEndpoint -> {
            String clientId = mqttEndpoint.clientIdentifier();
            log.debug("接收到MQTT客户端[{}]消息", clientId);
            // 执行创建连接
            doConnect(mqttEndpoint);
        }).listen(result -> {
            if (result.succeeded()) {
                int port = mqttServer.actualPort();
                log.debug("MQTT server started on port {}", port);
            } else {
                log.warn("MQTT server start failed", result.cause());
            }
        });
    }
 
    /**
     * 处理MQTT客户端连接请求。
     * @param endpoint MqttEndpoint实例,代表与客户端的连接
     */
    protected void doConnect(MqttEndpoint endpoint) {
        if (endpoint.auth() == null) {
            endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
            return;
        }
        String userName = endpoint.auth().userName();
        String passWord = endpoint.auth().password();
 
        if (authorityService.verification(endpoint.clientIdentifier(), userName, passWord)) {
            log.debug("MQTT客户端:{}认证通过", endpoint.clientIdentifier());
            acceptConnect(endpoint);
        } else {
            log.warn("客户端[{}]用户名密码错误", endpoint.clientIdentifier());
            endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        }
    }
 
    /**
     * 接受MQTT客户端连接,并进行订阅处理。
     * @param endpoint MqttEndpoint实例,代表与客户端的连接
     */
    protected void acceptConnect(MqttEndpoint endpoint) {
        String clientId = endpoint.clientIdentifier();
        MqttClienttest client = new MqttClienttest(endpoint);
 
        endpoint.accept(false)
                .closeHandler(v -> {
                    log.debug("[{}] closed", clientId);
                    Client old = clientRepository.getClient(clientId);
                    if (old == client) {
                        clientRepository.unregister(clientId);
                    } else {
                        log.debug("client {} is unregistered", client);
                    }
                })
                .subscribeHandler(subscribe -> {
                    List<MqttQoS> grantedQosLevels = new ArrayList<>();
                    for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
                        log.info("[{}] Subscription for {} with QoS {}", clientId, s.topicName(), s.qualityOfService());
                        grantedQosLevels.add(s.qualityOfService());
                        subscribers.put(s.topicName(), endpoint);
                    }
                    // ack the subscriptions request
                    endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
 
                    // specifing handlers for handling QoS 1 and 2
                    endpoint.publishAcknowledgeHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId))
                            .publishReceivedHandler(endpoint::publishRelease)
                            .publishCompletionHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId));
 
                })
                .unsubscribeHandler(unsubscribe -> {
                    unsubscribe.topics().forEach(topicName -> {
                        log.info("[{}] Unsubscription for {}", clientId, topicName);
                        // Remove the endpoint when a client unsubscribes
                        subscribers.remove(topicName);
                    });
                    // ack the subscriptions request
                    endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
                })
                .disconnectHandler(v -> log.info("[{}] Received disconnect from client", clientId))
                .exceptionHandler(e -> log.error(clientId, e))
                .publishHandler(message -> {
                    //设备推送了消息
                    String topicName = message.topicName();
                    Buffer buffer = message.payload();
                    String payload = buffer.toString();
 
                    log.info("接受到客户端消息推送:[{}] payload [{}] with QoS [{}]", topicName, payload, message.qosLevel());
                    if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                        endpoint.publishAcknowledge(message.messageId());
                    } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
                        endpoint.publishReceived(message.messageId());
                    }
 
                    //往订阅的客户端发送消息
                    handlePublish(message);
                    //平台物模型处理
                    try {
                        ClientMessage event = null;
                        //目前仅支持reply和report的topic
                        if (Topics.reply.equals(topicName)) {
                            event = JSON.parseObject(payload, CommandReplyEvent.class);
                        } else if (Topics.report.equals(topicName)) {
                            event = JSON.parseObject(payload, DeviceReportEvent.class);
                        }
                        if (null != event) {
                            event.setClientId(clientId);
                            //发布事件到spring
                            eventPublisher.publishEvent(event);
                        } else {
                            log.warn("不支持的topic:{} => {}", topicName, payload);
                        }
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                    }
                })
                .publishReleaseHandler(messageId -> {
                    log.debug("complete message :{}", messageId);
                    endpoint.publishComplete(messageId);
                });
        //注册设备
        clientRepository.register(client);
    }
 
    /**
     * 处理MQTT消息的发布,包括订阅关系的转发。
     * @param message MqttPublishMessage实例,代表接收到的MQTT消息
     */
    private void handlePublish(MqttPublishMessage message) {
        // Handle incoming publish messages
        String topic = message.topicName();
        String payload = message.payload().toString();
        int qos = message.qosLevel().value();
 
        System.out.println("Received message on [" + topic + "] payload [" + payload + "] with QoS " + qos);
 
        // Forward the message to subscribers matching the topic
        subscribers.forEach((subscribedTopic, subscriberEndpoint) -> {
            if (topicMatches(subscribedTopic, topic)) {
                // Handle different QoS levels
                switch (qos) {
                    case 0:
                        // QoS 0: At most once delivery, no acknowledgment needed
                        subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false);
                        System.out.println("Message forwarded to [" + subscribedTopic + "] with QoS 0");
                        break;
                    case 1:
                    case 2:
                        // QoS 1 and QoS 2: Acknowledgment needed
                        subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false,
                            publishResult -> handlePublishResult(publishResult, subscribedTopic, qos));
                        break;
                    default:
                        System.err.println("Unsupported QoS level: " + qos);
                }
            }
        });
    }
 
    /**
     * 处理MQTT消息发布的结果。
     * @param publishResult AsyncResult实例,代表MQTT消息发布的异步结果
     * @param topic 订阅主题
     * @param qos QoS级别
     */
    private void handlePublishResult(AsyncResult<Integer> publishResult, String topic, int qos) {
        if (publishResult.succeeded()) {
            System.out.println("Message forwarded to subscribers of [" + topic + "] with QoS " + qos);
        } else {
            System.err.println("Failed to forward message to [" + topic + "] with QoS " + qos +
                ": " + publishResult.cause().getMessage());
        }
    }
 
    /**
     * 判断订阅主题是否匹配实际主题。
     * @param subscribedTopic 订阅主题
     * @param actualTopic 实际主题
     * @return 是否匹配
     */
    private boolean topicMatches(String subscribedTopic, String actualTopic) {
        String[] subscribedParts = subscribedTopic.split("/");
        String[] actualParts = actualTopic.split("/");
 
        if (subscribedParts.length != actualParts.length) {
            return false;
        }
 
        for (int i = 0; i < subscribedParts.length; i++) {
            if (!subscribedParts[i].equals("+") && !subscribedParts[i].equals(actualParts[i])) {
                return false;
            }
        }
 
        return true;
    }
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 人工智能 Apache
2025 OSCAR丨与创新者同频!Apache RocketMQ 邀您共赴开源之约
10 月 28 日,阿里云高级技术专家周礼分享如何基于 Apache RocketMQ 新特性构建异步化 Multi-Agent 系统。
172 25
|
3月前
|
消息中间件 安全 物联网
海量接入、毫秒响应:易易互联基于 Apache RocketMQ + MQTT 构筑高可用物联网消息中枢
易易互联科技有限公司是吉利集团旗下专注于换电生态的全资子公司,致力于打造安全、便捷、便宜的智能换电网络。公司依托吉利GBRC换电平台,基于电池共享与车辆全生命周期运营,已布局超470座换电站,覆盖40多个城市,计划2027年达2000座。面对海量设备高并发连接、高实时性要求及数据洪峰挑战,易易互联采用阿里云MQTT与RocketMQ构建高效物联网通信架构,实现稳定接入、低延迟通信与弹性处理,全面支撑其全国换电网络规模化运营与智能化升级。
290 1
海量接入、毫秒响应:易易互联基于 Apache RocketMQ + MQTT 构筑高可用物联网消息中枢
|
消息中间件 运维 Serverless
商业版vs开源版:一图看懂云消息队列 RocketMQ 版核心优势
自建开源 RocketMQ 集群,为保证业务稳定性,往往需要按照业务请求的峰值去配置集群资源。云消息队列 RocketMQ 版 Serverless 实例通过资源快速伸缩,实现资源使用量与实际业务负载贴近,并按实际使用量计费,有效降低企业的运维压力和使用成本。
771 103
|
7月前
|
物联网
(手把手)在华为云、阿里云搭建自己的物联网MQTT消息服务器,免费IOT平台
本文介绍如何在阿里云搭建自己的物联网MQTT消息服务器,并使用 “MQTT客户端调试工具”模拟MQTT设备,接入平台进行消息收发。
2625 42
|
7月前
|
物联网
如何在腾讯云等平台搭建自己的物联网MQTT服务器Broker
物联网技术及MQTT协议被广泛应用于各种场景。本文介绍物联网MQTT服务助手下载,如何搭建自己的物联网平台,并使用 “MQTT客户端调试工具”模拟MQTT设备,接入平台进行消息收发。
621 37
|
6月前
|
消息中间件 Apache 双11
Apache RocketMQ + “太乙” = 开源贡献新体验
Apache RocketMQ 是 Apache 顶级项目,源于阿里巴巴,历经多年双十一考验。RocketMQ 联合“太乙”平台启动开源竞赛,提供贡献价值评价与奖金激励(最高 5000 元),助力开发者成为社区核心成员。竞赛包含详尽教程与自动搭建环境,促进技术生态繁荣,推动分布式消息处理技术发展。欢迎加入,共创开源未来!
256 1
|
11月前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ、Apache Seata 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。在评审出的 10 个年度开源项目中,Apache RocketMQ、Apache Seata 成功入选。
413 102
|
9月前
|
监控 物联网 网络性能优化
【杂谈】-MQTT与HTTP在物联网中的比较:为什么MQTT是更好的选择
通过上述分析,可以看出MQTT在物联网应用中的确是更好的选择。其高效的通信模型、低带宽消耗、稳定的连接保持机制以及可靠的消息质量保证,使其在各种物联网场景中都能表现出色。开发者在设计和实现物联网系统时,应优先考虑采用MQTT协议,以充分发挥其在资源受限环境下的优势,提升系统的整体性能和可靠性。
1662 26
|
消息中间件 弹性计算 运维
一图看懂云消息队列 RabbitMQ 版对比开源优势
一张图带您快速了解云消息队列 RabbitMQ 版对比开源版本的显著优势。
209 68
|
9月前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
238 1

热门文章

最新文章