开发者社区> 问答> 正文

永远在线程内运行Python 3 asyncio.coroutine

尝试在线程内运行HBMQTT消息代理时,我对asyncio还是陌生的。该手册提供了以下有关如何启动代理的示例:

import asyncio
import os
from hbmqtt.broker import Broker

@asyncio.coroutine
def broker_coro():
    broker = Broker()
    yield from broker.start()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(broker_coro())
    asyncio.get_event_loop().run_forever()

由于我所依赖的体系结构,代理必须在线程内运行。不幸的是,以下基本示例在线程调用run()之前崩溃。

import asyncio
from threading import Thread
from hbmqtt.broker import Broker

class ExampleThread(Thread):
    def __init__(self):
        super().__init__()
        self.daemon = True
        self.config = {
            'listeners': {
                'default': {
                    'max-connections': 50000,
                    'bind': 'localhost:1883',
                    'type': 'tcp',
                },
            },
            'auth': {
                'allow-anonymous': True,
            },
            'plugins': [ 'auth_anonymous' ],
            'topic-check': {
                'enabled': False
            }
        }
        self.loop = None
        self.broker = None

    @asyncio.coroutine
    def broker_coroutine(self):
        self.broker = Broker(self.config, self.loop)
        yield from self.broker.start()
        return self.broker

    def run(self) -> None:
        print('running ...')
        self.loop.run_forever()
        self.loop.run_until_complete(self.broker.shutdown())
        self.loop.close()

    def start(self):
        print('starting thread ...')
        self.loop = asyncio.new_event_loop()
        print('starting server ...')
        try:
            start_server = asyncio.gather(self.broker_coroutine(),
                                          loop=self.loop)
            self.loop.run_until_complete(start_server)
            broker = start_server.result()[0]
        except:
            print(traceback.format_exc())
            self.loop.close()

        super().start()


if __name__ == '__main__':
    thread = ExampleThread()
    thread.start()

启动示例将引发以下异常:

$ python3.7 ./mqtt.py
starting thread ...
starting server ...
Task was destroyed but it is pending!
task: <Task pending coro=<Broker._broadcast_loop() running at venv/lib/python3.7/site-packages/hbmqtt/broker.py:696> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x8024e7350>()]>>
Exception ignored in: <generator object Broker._broadcast_loop at 0x8027d1e50>
Traceback (most recent call last):
  File "venv/lib/python3.7/site-packages/hbmqtt/broker.py", line 696, in _broadcast_loop
  File "/usr/local/lib/python3.7/asyncio/queues.py", line 161, in get
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 687, in call_soon
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 479, in _check_closed
RuntimeError: Event loop is closed

有人可以解释导致事件循环关闭的原因吗?如果我运行一个简单的测试协程,它将起作用:

async def test_coroutine(self):
    while True:
        await asyncio.sleep(1)
        print('hey!')  

问题来源:stackoverflow

展开
收起
is大龙 2020-03-24 14:23:36 524 0
1 条回答
写回答
取消 提交回答
  • 我无法在Thread类中运行asyncio.coroutine,而是在一个可分辨的Thread中运行,如以下示例所示:

    import asyncio
    from threading import Thread
    from hbmqtt.broker import Broker
    
    class Server():
    
        def __init__(self):
            self.broker = None
            self.config = {
                'listeners': {
                    'default': {
                        'max-connections': 50000,
                        'bind': '127.0.0.1:1883',
                        'type': 'tcp',
                    },
                },
                'plugins': [ 'auth_anonymous' ],
                'auth': {
                    'allow-anonymous': True,
                },
                'topic-check': {
                    'enabled': True,
                    'plugins': ['topic_taboo'],
                },
            }
    
        async def broker_coroutine(self, config, loop):
            self.broker = Broker(config, loop)
            await self.broker.start()
    
        def start(self):
            loop = asyncio.new_event_loop()
            thread = Thread(target=lambda: self.run(loop))
            thread.start()
    
        def run(self, loop):
            try:
                future = asyncio.gather(self.broker_coroutine(self.config, loop),
                                        loop=loop,
                                        return_exceptions=True)
                loop.run_until_complete(future)
                loop.run_forever()
            except (Exception, KeyboardInterrupt):
                loop.close()
            finally:
                loop.run_until_complete(self.broker.shutdown())
                loop.close()
    
    if __name__ == '__main__':
        server = Server()
        server.start()
    

    回答来源:stackoverflow

    2020-03-24 14:23:45
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
From Python Scikit-Learn to Sc 立即下载
Data Pre-Processing in Python: 立即下载
双剑合璧-Python和大数据计算平台的结合 立即下载