测试地点: 北京
测试主机: 公网测试主机
语言: python
计划用
python
实现mqtt 接入
topic
java/.net/c++
的样例, 但是没有 python 的样例程序, 自己测试在控制台无法看到 任何
消费者/生产者
连接
Connected with result code 0
Connection returned 0
publish success, msg = 2016-11-12 14:58:43.454239
下面是我写的测试连接程序
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
user = "user0001"
pwd = "user0001password"
mqtt_svr = "mqtt-test.cn-qingdao.aliyuncs.com"
port = 1883 # endpoint端口
topic = "testtopic" # 订阅的主题内容
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe(topic, qos=0)
def on_message(client, userdata, msg):
print("topic:" + msg.topic + " Message:" + str(msg.payload))
client = mqtt.Client(
client_id="CID_test0001",
clean_session=True,
userdata=None,
protocol='MQTTv31'
)
client.username_pw_set(user, pwd)
client.on_connect = on_connect
client.on_message = on_message
client.connect(mqtt_svr, port, 60)
client.loop_forever()
import time
import paho.mqtt.client as mqtt
import datetime
def on_publish(msg, rc): # 成功发布消息的操作
if rc == 0:
print("publish success, msg = " + msg)
def on_connect(client, userdata, flags, rc): # 连接后的操作 0为成功
print("Connection returned " + str(rc))
client = mqtt.Client(
client_id="PID_test0001",
clean_session=True,
userdata=None,
protocol='MQTTv311'
)
user = "user0001"
pwd = "user0001password"
mqtt_svr = "mqtt-test.cn-qingdao.aliyuncs.com"
port = 1883
topic = "testtopic"
client.username_pw_set(user, pwd)
client.connect(endpoint, port, 60)
client.on_connect = on_connect
client.loop_start()
time.sleep(2)
count = 0
while count < 1:
count = count + 1
msg = str(datetime.datetime.now())
rc, mid = client.publish(topic, payload=msg, qos=0)
on_publish(msg, rc)
time.sleep(1)
我用您的代码 accesskey什么的换成 我自己的 生产者打印Connection returned 0
publish success, msg = 2016.... 消费者打印 Connected with result code 0 但是消费者收不到消息
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。