记录一次MQTT client反复上下线问题

简介: # 问题描述有两个使用go语言编写的MQTT agent A 和 B 同时运行,监听和使用不同的topic,其中一个agent A反复上线下线,导致cmdb反复更新和查询设备,进而导致mongo负载高。关掉 A之后负载恢复正常![](https://ata2-img.oss-cn-zhangjiakou.aliyuncs.com/neweditor/23fbb14a-70e4-48f7-a4a

问题描述

有两个使用go语言编写的MQTT agent A 和 B 同时运行,监听和使用不同的topic,其中一个agent A反复上线下线,导致cmdb反复更新和查询设备,进而导致mongo负载高。关掉 A之后负载恢复正常

排查过程

是否为网络本身问题

用python写了publisher和subscriber
MQTT消息发送端:

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
    print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client()

client.on_message = on_message
client.username_pw_set("{usr_name}", "{usr_key}")
client.connect('{host}', 1883, 600)
client.on_connect = on_connect
client.loop_start()
time.sleep(2)
count = 0

while count < 5:
    count = count + 1
    client.publish('{my_topic}', payload='{msg}', qos=1)
    time.sleep(2)

MQTT消息监听端:

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code: " + str(rc))
    client.subscribe(("{my_topic}", 1))

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set("{usr_name}", "{usr_key}")
client.connect('{host}', 1883, 600)# 600为keepalive的时间间隔
client.loop_forever() # 保持连接
  • 遇到的一点bug:发送成功,监听成功,但是监听没有收到消息

解决方法:acl.conf里面配置了用户名和可以监控的topic,添加对应用户和topic后消息成功接收

{allow, {user, "{usr_name}"}, publish, ["my/topic"]}.
{allow, {user, "{usr_name}"}, publish, ["my/topic"]}.


  • 一点新的bug:qos=0时可以同步通信,qos=1时延迟较高

分析mqtt的client.publish源码

 def publish(self, topic, payload=None, qos=0, retain=False, properties=None):
       ····· # 前置判断topic、Qos level、payload编码等是否有效
       if qos == 0:
            # Qos为0时直接发送
            info = MQTTMessageInfo(local_mid)
            rc = self._send_publish(
                local_mid, topic, local_payload, qos, retain, False, info, properties)
            info.rc = rc
            return info
        else:
            # QoS 为 1 或 2时包含了重发机制,需要等待对方的ACK,能保证消息至少到达一次
            message = MQTTMessage(local_mid, topic)
            message.timestamp = time_func()
            message.payload = local_payload
            message.qos = qos
            message.retain = retain
            message.dup = False
            message.properties = properties

            with self._out_message_mutex:
                if self._max_queued_messages > 0 and len(self._out_messages) >= self._max_queued_messages:
                    message.info.rc = MQTT_ERR_QUEUE_SIZE
                    return message.info

                if local_mid in self._out_messages:
                    message.info.rc = MQTT_ERR_QUEUE_SIZE
                    return message.info

                self._out_messages[message.mid] = message
                if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
                    self._inflight_messages += 1
                    if qos == 1:
                        message.state = mqtt_ms_wait_for_puback
                    elif qos == 2:
                        message.state = mqtt_ms_wait_for_pubrec

                    rc = self._send_publish(message.mid, topic, message.payload, message.qos, message.retain,
                                            message.dup, message.info, message.properties)

                    # remove from inflight messages so it will be send after a connection is made
                    if rc is MQTT_ERR_NO_CONN:
                        self._inflight_messages -= 1
                        message.state = mqtt_ms_publish

                    message.info.rc = rc
                    return message.info
                else:
                    message.state = mqtt_ms_queued
                    message.info.rc = MQTT_ERR_SUCCESS
                    return message.info
  • 一点别的bug:pub提交两万条数据而sub才收到3k条,并且在pub停止提交后sub也停止接受了,怀疑没有缓存队列

是否是多个agent在抢占资源

分别单独运行agent A 和 B,agent A单独运行时也会触发反复上下线问题,所以更倾向于是 A 本身的问题

分析有问题的agent A的发送流程

通过比较A和B的bizMap发现A的bizMap中有两个log文件,其中一个有3.7G。通过查看go的日志发现A上线后publish产生了error。此处有个需要注意的点,go语言的mqtt日志分级,需要在.yml配置文件中显式指定logging.level=DEBUG才能看到DEBUG级别的日志。(感谢大佬们指点,配置这玩意儿真的搜不出来
这时可能有几种情况:

  • 接受A消息的接口可以触发A的下线,导致A不断重新上线
  • A 并发处理多条消息上传,导致断连(由于仅运行一个 A 也会触发问题,这个情况可以排除)
  • mqtt的引擎EMQX自身限制了发送速率

参考文章: https://juejin.cn/post/6850418118746243080
我们将上述参考文章中提到的接受包速率限制和最大并发数等在配置emqx.conf中进行扩大,然后重启再运行两个agent,反复上下线问题就解决了

一些小结

涉及网络通信问题是的排查顺序可以总结为:

  1. 网络本身能否保证连接,是否涉及资源抢占
  2. 通信协议自身的速率限制、并发限制
  3. 通信的后续功能是否触发掉线
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
网络安全 API Android开发
Eclipse Paho:MQTT Client C的使用
Eclipse Paho:MQTT Client C的使用
815 0
|
运维 监控 JavaScript
基于开源Python MQTT Client连接阿里云IoT
前面介绍了基于开源JAVA MQTT Client连接阿里云IoT,这里面继续介绍如果使用Python MQTT Client连接阿里云IoT。
基于开源Python MQTT Client连接阿里云IoT
|
弹性计算 网络协议 物联网
JAVA MQTT Client如何连接阿里云IoT?
在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布。
2127 0
JAVA MQTT Client如何连接阿里云IoT?
|
网络协议 Java 物联网
基于开源JAVA MQTT Client连接阿里云IoT
在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。
基于开源JAVA MQTT Client连接阿里云IoT
|
网络协议 物联网 Java
基于开源Java MQTT Client的阿里云物联网平台RRPC功能测试
本文主要基于开源Java MQTT Client,分别针对系统Topic和自定义Topic,演示阿里云物联网平台RRPC的实现。
基于开源Java MQTT Client的阿里云物联网平台RRPC功能测试
|
物联网 Java C#
基于开源 Net MQTT Client 连接阿里云物联网平台
前面分别介绍了基于开源MQTT Client Java及Python语言连接阿里云物联网平台的示例,这里使用M2Mqtt Client C#进行测试,结合自定义Topic演示消息的上下行。
基于开源 Net MQTT Client 连接阿里云物联网平台
|
传感器 机器学习/深度学习 网络协议
MQTT C Client实现消息推送(入门指南)
MQTT C Client实现消息推送(入门指南) MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,通过协议,目前已经扩展出了数十个MQTT服务器端程序,可以通过PHP,JAVA,Python,C,C#等系统语言来向MQTT发送相关消息。
7959 0
|
C# 消息中间件 RocketMQ
rocketmq client for c#
基于ikvm的rocketmq的c#客户端,由于阿里对c#不敏感,对这方面的东西缺少。因为工作需要弄了一个,分享给大家 https://github.com/franknew/RocketMQ-Client 如何使用: 1.
5331 0
|
物联网
基于开源 Net MQTT Client 连接阿里云物联网平台
前面分别介绍了基于开源MQTT Client Java及Python语言连接阿里云物联网平台的示例,这里使用M2Mqtt Client C#进行测试,结合自定义Topic演示消息的上下行。
3157 0
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。