【开发者笔记】MQTT python测试笔记

简介: MQTT是基于订阅/发布的物联网协议。 python测试需要一个发送进程和接收进程,即一个发送客户端和一个接收客户端,如果这两个客户端工作在同一个topic下,那么就能进行消息互通了。 服务器用“iot.eclipse.org”就好了,避免了自己搭建服务器,然后流程还可以跑通。

MQTT是基于订阅/发布的物联网协议。

python测试需要一个发送进程和接收进程,即一个发送客户端和一个接收客户端,如果这两个客户端工作在同一个topic下,那么就能进行消息互通了。

服务器用“iot.eclipse.org”就好了,避免了自己搭建服务器,然后流程还可以跑通。

发送客户端代码:

import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish

idx = 0
#往paho/temperature 一直发送内容 while True: print("send success") publish.single("paho/temperature", payload="this is message:%s"%idx, hostname="iot.eclipse.org", client_id="lora1", # qos = 0, # tls=tls, port=1883, protocol=mqtt.MQTTv311) idx += 1

  接收客户端代码:

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))


# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    #在这里处理业务逻辑
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("iot.eclipse.org", 1883, 60)
#订阅频道 client.subscribe("paho/temperature") # Blocking call that processes network traffic, dispatches callbacks and # handles reconnecting. # Other loop*() functions are available that give a threaded interface and a # manual interface. client.loop_forever()

  然后运行两个客户端,就可以在接收端收到消息了。

  MQTT服务器不负责存储数据,需要编写额外的接收客户端来接收数据、分析、入库等。

  MQTT服务器用的是iot.eclipse.org,如果碰巧两个人在用同一个频道,那可能收到别人的消息哦~

  如果要搭建自己的MQTT服务器,那么回头再说。

  玩一玩就好了,不要给服务器增加太多负担哟~

 

 

参考资料:

  paho-qtt说明文档

  

黑夜给了我黑色的眼睛,我却用它寻找光明
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1天前
|
数据采集 数据可视化 数据挖掘
【新手解答】Python中Pandas的初学者笔记
【新手解答】Python中Pandas的初学者笔记
4 0
|
2天前
|
NoSQL 关系型数据库 MySQL
涉及rocketMQ,jemeter等性能测试服务器的安装记录
涉及rocketMQ,jemeter等性能测试服务器的安装记录
12 1
|
3天前
|
SQL 测试技术 网络安全
Python之SQLMap:自动SQL注入和渗透测试工具示例详解
Python之SQLMap:自动SQL注入和渗透测试工具示例详解
13 0
|
4天前
|
测试技术 开发者 Python
Python中的单元测试
【5月更文挑战第8天】在Python软件开发中,确保代码质量是关键,单元测试和测试驱动开发(TDD)是实现这一目标的有效方法。本文介绍了如何使用unittest和pytest进行单元测试,以及如何通过TDD编写可靠代码。首先,展示了单元测试的基本概念和示例,然后详细解释了TDD的"红-绿-重构"循环。此外,还讨论了pytest如何简化单元测试,并给出了使用TDD重构函数的例子。
6 1
|
8天前
|
Java 开发者 索引
Python基础语法:类笔记
本篇博文是把自己在学习python类的过程中自己理解和笔记,一点一点总结的写出出来,做一个总结,加深对面向对象编程的理解。
|
9天前
|
监控 测试技术 持续交付
Python自动化测试代理程序可用性
总之,通过编写测试用例、自动化测试和设置监控系统,您可以确保Python自动化测试代理程序的可用性,并及时发现和解决问题。这有助于提供更可靠和高性能的代理服务。
14 4
|
11天前
|
机器学习/深度学习 算法 UED
【Python 机器学习专栏】A/B 测试在机器学习项目中的应用
【4月更文挑战第30天】A/B测试在数据驱动的机器学习项目中扮演关键角色,用于评估模型性能、算法改进和特征选择。通过定义目标、划分群组、实施处理、收集数据和分析结果,A/B测试能帮助优化模型和用户体验。Python提供工具如pandas和scipy.stats支持实验实施与分析。注意样本量、随机性、时间因素和多变量分析,确保测试有效性。A/B测试助力于持续改进机器学习项目,实现更好的成果。
|
13天前
|
测试技术 Python
python运行集成测试
【4月更文挑战第22天】
10 1
|
13天前
|
测试技术 Python
python编写集成测试
【4月更文挑战第22天】
4 1
|
14天前
|
数据可视化 测试技术 持续交付
python分析测试结果
【4月更文挑战第21天】
21 3

热门文章

最新文章