/** * VertxMqttServer: 一个基于Vert.x实现的MQTT服务器。 * 该服务器支持设备连接、认证、消息发布和订阅功能。 */ package com.mz.network.mqtt; import com.alibaba.fastjson.JSON; import com.mz.network.authority.DeviceAuthorityService; import com.mz.network.client.Client; import com.mz.network.client.ClientRepository; import com.mz.network.client.message.ClientMessage; import com.mz.network.core.Topics; import com.mz.network.events.CommandReplyEvent; import com.mz.network.events.DeviceReportEvent; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.buffer.Buffer; import io.vertx.mqtt.*; import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j public class VertxMqttServer extends AbstractVerticle { @Autowired private ClientRepository clientRepository; @Value("${vertx.service-id}") private String serviceId; @Autowired private ApplicationEventPublisher eventPublisher; @Autowired private MqttServerOptions mqttServerOptions; @Autowired private DeviceAuthorityService authorityService; // 用于存储订阅关系的映射 private final Map<String, MqttEndpoint> subscribers = new ConcurrentHashMap<>(); /** * 重写AbstractVerticle的start方法,实现MQTT服务器的启动和初始化。 * @throws Exception 启动过程中可能抛出的异常 */ @Override public void start() throws Exception { // 创建MQTT服务器实例 MqttServer mqttServer = MqttServer.create(vertx, mqttServerOptions); // 处理MQTT客户端连接请求 mqttServer.endpointHandler(mqttEndpoint -> { String clientId = mqttEndpoint.clientIdentifier(); log.debug("接收到MQTT客户端[{}]消息", clientId); // 执行创建连接 doConnect(mqttEndpoint); }).listen(result -> { if (result.succeeded()) { int port = mqttServer.actualPort(); log.debug("MQTT server started on port {}", port); } else { log.warn("MQTT server start failed", result.cause()); } }); } /** * 处理MQTT客户端连接请求。 * @param endpoint MqttEndpoint实例,代表与客户端的连接 */ protected void doConnect(MqttEndpoint endpoint) { if (endpoint.auth() == null) { endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED); return; } String userName = endpoint.auth().userName(); String passWord = endpoint.auth().password(); if (authorityService.verification(endpoint.clientIdentifier(), userName, passWord)) { log.debug("MQTT客户端:{}认证通过", endpoint.clientIdentifier()); acceptConnect(endpoint); } else { log.warn("客户端[{}]用户名密码错误", endpoint.clientIdentifier()); endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); } } /** * 接受MQTT客户端连接,并进行订阅处理。 * @param endpoint MqttEndpoint实例,代表与客户端的连接 */ protected void acceptConnect(MqttEndpoint endpoint) { String clientId = endpoint.clientIdentifier(); MqttClienttest client = new MqttClienttest(endpoint); endpoint.accept(false) .closeHandler(v -> { log.debug("[{}] closed", clientId); Client old = clientRepository.getClient(clientId); if (old == client) { clientRepository.unregister(clientId); } else { log.debug("client {} is unregistered", client); } }) .subscribeHandler(subscribe -> { List<MqttQoS> grantedQosLevels = new ArrayList<>(); for (MqttTopicSubscription s : subscribe.topicSubscriptions()) { log.info("[{}] Subscription for {} with QoS {}", clientId, s.topicName(), s.qualityOfService()); grantedQosLevels.add(s.qualityOfService()); subscribers.put(s.topicName(), endpoint); } // ack the subscriptions request endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels); // specifing handlers for handling QoS 1 and 2 endpoint.publishAcknowledgeHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId)) .publishReceivedHandler(endpoint::publishRelease) .publishCompletionHandler(messageId -> log.info("[{}] Received ack for message = {}", clientId, messageId)); }) .unsubscribeHandler(unsubscribe -> { unsubscribe.topics().forEach(topicName -> { log.info("[{}] Unsubscription for {}", clientId, topicName); // Remove the endpoint when a client unsubscribes subscribers.remove(topicName); }); // ack the subscriptions request endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); }) .disconnectHandler(v -> log.info("[{}] Received disconnect from client", clientId)) .exceptionHandler(e -> log.error(clientId, e)) .publishHandler(message -> { //设备推送了消息 String topicName = message.topicName(); Buffer buffer = message.payload(); String payload = buffer.toString(); log.info("接受到客户端消息推送:[{}] payload [{}] with QoS [{}]", topicName, payload, message.qosLevel()); if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { endpoint.publishAcknowledge(message.messageId()); } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { endpoint.publishReceived(message.messageId()); } //往订阅的客户端发送消息 handlePublish(message); //平台物模型处理 try { ClientMessage event = null; //目前仅支持reply和report的topic if (Topics.reply.equals(topicName)) { event = JSON.parseObject(payload, CommandReplyEvent.class); } else if (Topics.report.equals(topicName)) { event = JSON.parseObject(payload, DeviceReportEvent.class); } if (null != event) { event.setClientId(clientId); //发布事件到spring eventPublisher.publishEvent(event); } else { log.warn("不支持的topic:{} => {}", topicName, payload); } } catch (Exception e) { log.error(e.getMessage(), e); } }) .publishReleaseHandler(messageId -> { log.debug("complete message :{}", messageId); endpoint.publishComplete(messageId); }); //注册设备 clientRepository.register(client); } /** * 处理MQTT消息的发布,包括订阅关系的转发。 * @param message MqttPublishMessage实例,代表接收到的MQTT消息 */ private void handlePublish(MqttPublishMessage message) { // Handle incoming publish messages String topic = message.topicName(); String payload = message.payload().toString(); int qos = message.qosLevel().value(); System.out.println("Received message on [" + topic + "] payload [" + payload + "] with QoS " + qos); // Forward the message to subscribers matching the topic subscribers.forEach((subscribedTopic, subscriberEndpoint) -> { if (topicMatches(subscribedTopic, topic)) { // Handle different QoS levels switch (qos) { case 0: // QoS 0: At most once delivery, no acknowledgment needed subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false); System.out.println("Message forwarded to [" + subscribedTopic + "] with QoS 0"); break; case 1: case 2: // QoS 1 and QoS 2: Acknowledgment needed subscriberEndpoint.publish(topic, message.payload(), message.qosLevel(), message.isDup(), false, publishResult -> handlePublishResult(publishResult, subscribedTopic, qos)); break; default: System.err.println("Unsupported QoS level: " + qos); } } }); } /** * 处理MQTT消息发布的结果。 * @param publishResult AsyncResult实例,代表MQTT消息发布的异步结果 * @param topic 订阅主题 * @param qos QoS级别 */ private void handlePublishResult(AsyncResult<Integer> publishResult, String topic, int qos) { if (publishResult.succeeded()) { System.out.println("Message forwarded to subscribers of [" + topic + "] with QoS " + qos); } else { System.err.println("Failed to forward message to [" + topic + "] with QoS " + qos + ": " + publishResult.cause().getMessage()); } } /** * 判断订阅主题是否匹配实际主题。 * @param subscribedTopic 订阅主题 * @param actualTopic 实际主题 * @return 是否匹配 */ private boolean topicMatches(String subscribedTopic, String actualTopic) { String[] subscribedParts = subscribedTopic.split("/"); String[] actualParts = actualTopic.split("/"); if (subscribedParts.length != actualParts.length) { return false; } for (int i = 0; i < subscribedParts.length; i++) { if (!subscribedParts[i].equals("+") && !subscribedParts[i].equals(actualParts[i])) { return false; } } return true; } }