问题描述
有两个使用go语言编写的MQTT agent A 和 B 同时运行,监听和使用不同的topic,其中一个agent A反复上线下线,导致cmdb反复更新和查询设备,进而导致mongo负载高。关掉 A之后负载恢复正常
排查过程
是否为网络本身问题
用python写了publisher和subscriber
MQTT消息发送端:
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
client.on_message = on_message
client.username_pw_set("{usr_name}", "{usr_key}")
client.connect('{host}', 1883, 600)
client.on_connect = on_connect
client.loop_start()
time.sleep(2)
count = 0
while count < 5:
count = count + 1
client.publish('{my_topic}', payload='{msg}', qos=1)
time.sleep(2)
MQTT消息监听端:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
client.subscribe(("{my_topic}", 1))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set("{usr_name}", "{usr_key}")
client.connect('{host}', 1883, 600)# 600为keepalive的时间间隔
client.loop_forever() # 保持连接
- 遇到的一点bug:发送成功,监听成功,但是监听没有收到消息
解决方法:acl.conf里面配置了用户名和可以监控的topic,添加对应用户和topic后消息成功接收
{allow, {user, "{usr_name}"}, publish, ["my/topic"]}.
{allow, {user, "{usr_name}"}, publish, ["my/topic"]}.
- 一点新的bug:qos=0时可以同步通信,qos=1时延迟较高
分析mqtt的client.publish源码
def publish(self, topic, payload=None, qos=0, retain=False, properties=None):
····· # 前置判断topic、Qos level、payload编码等是否有效
if qos == 0:
# Qos为0时直接发送
info = MQTTMessageInfo(local_mid)
rc = self._send_publish(
local_mid, topic, local_payload, qos, retain, False, info, properties)
info.rc = rc
return info
else:
# QoS 为 1 或 2时包含了重发机制,需要等待对方的ACK,能保证消息至少到达一次
message = MQTTMessage(local_mid, topic)
message.timestamp = time_func()
message.payload = local_payload
message.qos = qos
message.retain = retain
message.dup = False
message.properties = properties
with self._out_message_mutex:
if self._max_queued_messages > 0 and len(self._out_messages) >= self._max_queued_messages:
message.info.rc = MQTT_ERR_QUEUE_SIZE
return message.info
if local_mid in self._out_messages:
message.info.rc = MQTT_ERR_QUEUE_SIZE
return message.info
self._out_messages[message.mid] = message
if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
self._inflight_messages += 1
if qos == 1:
message.state = mqtt_ms_wait_for_puback
elif qos == 2:
message.state = mqtt_ms_wait_for_pubrec
rc = self._send_publish(message.mid, topic, message.payload, message.qos, message.retain,
message.dup, message.info, message.properties)
# remove from inflight messages so it will be send after a connection is made
if rc is MQTT_ERR_NO_CONN:
self._inflight_messages -= 1
message.state = mqtt_ms_publish
message.info.rc = rc
return message.info
else:
message.state = mqtt_ms_queued
message.info.rc = MQTT_ERR_SUCCESS
return message.info
- 一点别的bug:pub提交两万条数据而sub才收到3k条,并且在pub停止提交后sub也停止接受了,怀疑没有缓存队列
是否是多个agent在抢占资源
分别单独运行agent A 和 B,agent A单独运行时也会触发反复上下线问题,所以更倾向于是 A 本身的问题
分析有问题的agent A的发送流程
通过比较A和B的bizMap发现A的bizMap中有两个log文件,其中一个有3.7G。通过查看go的日志发现A上线后publish产生了error。此处有个需要注意的点,go语言的mqtt日志分级,需要在.yml配置文件中显式指定logging.level=DEBUG才能看到DEBUG级别的日志。(感谢大佬们指点,配置这玩意儿真的搜不出来
这时可能有几种情况:
- 接受A消息的接口可以触发A的下线,导致A不断重新上线
- A 并发处理多条消息上传,导致断连(由于仅运行一个 A 也会触发问题,这个情况可以排除)
- mqtt的引擎EMQX自身限制了发送速率
参考文章: https://juejin.cn/post/6850418118746243080
我们将上述参考文章中提到的接受包速率限制和最大并发数等在配置emqx.conf中进行扩大,然后重启再运行两个agent,反复上下线问题就解决了
一些小结
涉及网络通信问题是的排查顺序可以总结为:
- 网络本身能否保证连接,是否涉及资源抢占
- 通信协议自身的速率限制、并发限制
- 通信的后续功能是否触发掉线