尝试在线程内运行HBMQTT消息代理时,我对asyncio还是陌生的。该手册提供了以下有关如何启动代理的示例:
import asyncio
import os
from hbmqtt.broker import Broker
@asyncio.coroutine
def broker_coro():
broker = Broker()
yield from broker.start()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(broker_coro())
asyncio.get_event_loop().run_forever()
由于我所依赖的体系结构,代理必须在线程内运行。不幸的是,以下基本示例在线程调用run()之前崩溃。
import asyncio
from threading import Thread
from hbmqtt.broker import Broker
class ExampleThread(Thread):
def __init__(self):
super().__init__()
self.daemon = True
self.config = {
'listeners': {
'default': {
'max-connections': 50000,
'bind': 'localhost:1883',
'type': 'tcp',
},
},
'auth': {
'allow-anonymous': True,
},
'plugins': [ 'auth_anonymous' ],
'topic-check': {
'enabled': False
}
}
self.loop = None
self.broker = None
@asyncio.coroutine
def broker_coroutine(self):
self.broker = Broker(self.config, self.loop)
yield from self.broker.start()
return self.broker
def run(self) -> None:
print('running ...')
self.loop.run_forever()
self.loop.run_until_complete(self.broker.shutdown())
self.loop.close()
def start(self):
print('starting thread ...')
self.loop = asyncio.new_event_loop()
print('starting server ...')
try:
start_server = asyncio.gather(self.broker_coroutine(),
loop=self.loop)
self.loop.run_until_complete(start_server)
broker = start_server.result()[0]
except:
print(traceback.format_exc())
self.loop.close()
super().start()
if __name__ == '__main__':
thread = ExampleThread()
thread.start()
启动示例将引发以下异常:
$ python3.7 ./mqtt.py
starting thread ...
starting server ...
Task was destroyed but it is pending!
task: <Task pending coro=<Broker._broadcast_loop() running at venv/lib/python3.7/site-packages/hbmqtt/broker.py:696> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x8024e7350>()]>>
Exception ignored in: <generator object Broker._broadcast_loop at 0x8027d1e50>
Traceback (most recent call last):
File "venv/lib/python3.7/site-packages/hbmqtt/broker.py", line 696, in _broadcast_loop
File "/usr/local/lib/python3.7/asyncio/queues.py", line 161, in get
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 687, in call_soon
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 479, in _check_closed
RuntimeError: Event loop is closed
有人可以解释导致事件循环关闭的原因吗?如果我运行一个简单的测试协程,它将起作用:
async def test_coroutine(self):
while True:
await asyncio.sleep(1)
print('hey!')
问题来源:stackoverflow
我无法在Thread类中运行asyncio.coroutine,而是在一个可分辨的Thread中运行,如以下示例所示:
import asyncio
from threading import Thread
from hbmqtt.broker import Broker
class Server():
def __init__(self):
self.broker = None
self.config = {
'listeners': {
'default': {
'max-connections': 50000,
'bind': '127.0.0.1:1883',
'type': 'tcp',
},
},
'plugins': [ 'auth_anonymous' ],
'auth': {
'allow-anonymous': True,
},
'topic-check': {
'enabled': True,
'plugins': ['topic_taboo'],
},
}
async def broker_coroutine(self, config, loop):
self.broker = Broker(config, loop)
await self.broker.start()
def start(self):
loop = asyncio.new_event_loop()
thread = Thread(target=lambda: self.run(loop))
thread.start()
def run(self, loop):
try:
future = asyncio.gather(self.broker_coroutine(self.config, loop),
loop=loop,
return_exceptions=True)
loop.run_until_complete(future)
loop.run_forever()
except (Exception, KeyboardInterrupt):
loop.close()
finally:
loop.run_until_complete(self.broker.shutdown())
loop.close()
if __name__ == '__main__':
server = Server()
server.start()
回答来源:stackoverflow
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。