异步编程中常见的问题和处理方式

本文涉及的产品
大数据开发治理平台 DataWorks,不限时长
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
简介: 【6月更文挑战第23天】在python中`asyncio` 提供PriorityQueue和LifoQueue,用于不同检索策略。异步编程需注意任务调度、错误处理和资源管理,以提高响应性和避免阻塞。

简介

异步编程常见异常如TimeoutError、CancelledError、InvalidStateError等。
调试模式通过PYTHONASYNCIODEBUG或loop.set_debug启用,检查未等待的协程,增强错误检测。
避免阻塞代码,使用run_in_executor。
日志记录可通过logging模块调整。
未等待的协程和未处理异常会发出警告。
treeoflife6.png

1 优先队列 PriorityQueue

PriorityQueue子类Queue,以优先级顺序检索条目 从低到低,条目通常是以元组形式 (优先级,数据)。

    class asyncio.PriorityQueue

LifoQueue 子类Queue首先检索最近添加的条目

    class asyncio.LifoQueue

异常

exception asyncio.TimeoutError

        该操作已超过规定的截止日期。
        重要 这个异常与内置 TimeoutError 异常不同。

exception asyncio.CancelledError

       该操作已被取消。
        取消asyncio任务时,可以捕获此异常以执行自定义操作。
        在几乎所有情况下,都必须重新引发异常。
        在 3.8 版更改: CancelledError 现在是 BaseException 的子类。

exception asyncio.InvalidStateError

        Task 或 Future 的内部状态无效。
        在为已设置结果值的未来对象设置结果值等情况下,可以引发此问题。

exception asyncio.SendfileNotAvailableError

        "sendfile" 系统调用不适用于给定的套接字或文件类型。
        子类 RuntimeError 。

exception asyncio.IncompleteReadError

        请求的读取操作未完全完成。
        由 asyncio stream APIs 提出
        此异常是 EOFError 的子类。    

expected

        预期字节的总数( int )。

partial

        到达流结束之前读取的 bytes 字符串。

exception asyncio.LimitOverrunError

        在查找分隔符时达到缓冲区大小限制。
        由 asyncio stream APIs 提出
        consumed
                要消耗的字节总数。

asyncio.QueueEmpty

        get_nowait() 时,Queue为空对象时引发

asyncio.QueueFull

        put_nowait() 在Queue已满的对象上调用该方法时引发异常

2 使用 用asyncio开发

异步与传统的顺序 编程不同. 这里有一些常见的错误和陷阱,如何避免它们.

 异步调试模式
 消除
 并发和多线程
 正确处理阻塞功能
 日志记录
 检测从未调度的协程对象
 检测从未消耗的异常
 正确协程
 待处理任务已销毁
 关闭传输和事件循环

3 Debug模式

默认场景下,asyncio 以生成模式运行,为了简化开发,asyncio还有一种debug模式

    将 PYTHONASYNCIODEBUG 设置为 1
    使用 python 开发模式
    将debug=True 传递给 asyncio.run()
    调用 loop.set_debug()

调试模式将有以下效应

asyncio 检查 未被等待的协程 并记录他们,这将消除被遗忘的等待的问题.
许多非线程安全的异步APIs ,例如loop.call_soon(), loop._call_at(), 如果从错误线程调用,则引发异常.

如果执行I/O操作花费时间太长,则记录I/O选择器的执行时间.
执行时间超过100毫秒的回调将载入日志.

属性 loop.slow_callback_duration 可用于设置秒为单位的最小执行持续时间. 这表示 缓慢.

4 并发性,多线程

事件循环在线程中运行,通常是主线程,并在其线程执行所有回调和任务.
当一个任务在事件循环中运行时,没有其他任务可以在同一线程运行.当一个任务执行一个 await表达式时

正在运行的任务被挂起,事件循环执行下一个任务.

要调度来自另一个OS线程的callback,应该使用 loop.call_soon_threadsafe()方法. 例如

            loop.call_soon_threadsafe(callback, *args)

几乎所有异步对象 都不是线程安全的,这通常不是问题.

除法在任务或回调函数之外有代码 可以使用他们.如果需要这样的代码调用低级 异步API
应该使用loop.call_soon_threadsafe() 方法,如

            loop.call_soon_threadsafe(fut, cancel)

要从不同OS线程调度一个协程对象,应该使用run_coroutine_threadsafe() 函数.它返回一个 Future

            async def coro_func():
                    return await asyncio.sleep(1, 42)
            # Later in another OS thread
            future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
            # 等待结果
            result = future.result()

为了能够处理信号和执行子进程,事件循环必须运行于主线程中.

方法loop.run_in_executor() 可以和concurrent.future.ThreadPoolExecutor 一起使用.

用于在一个不同操作系统线程中执行阻塞代码,并避免阻塞运行事件循环的哪个操作系统线程.

目前没有其他办法能直接从另一个进程(例如通过 multiprocessing 启动的进程) 安排协程或回调.

事件循环方法 中 有一些 可以从管道读取并监视文件 描述符 而不会阻塞事件循环的API

此外,asyncio的子进程 API提供了一种 启动进程并从事件循环与其通信的办法.

最后,之前提到的 loop.run_inexecutor() 方法也可以配合 concurrent.futures.ProcessPoolExecutor

使用以在另一个进程 执行代码.

  • 示例,不启用 debug = True时的运行时错误信息

    python asyncexample.py
       /asyncexample.py:16: RuntimeWarning: coroutine 'test' was never awaited
        test()
      RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    
  • 示例,启动 debug=True时 的运行错误信息

      python asyncexample.py
       /asyncexample.py:16: RuntimeWarning: coroutine 'test' was never awaited
      Coroutine created at (most recent call last)
        File "/asyncexample.py", line 21, in <module>
          a = asyncio.run(main11(),debug=True)
        File "/asyncio/runners.py", line 44, in run
          return loop.run_until_complete(main)
        File "/asyncio/base_events.py", line 629, in run_until_complete
          self.run_forever()
        File "/asyncio/base_events.py", line 596, in run_forever
          self._run_once()
        File "/asyncio/base_events.py", line 1882, in _run_once
          handle._run()
        File "/asyncio/events.py", line 80, in _run
          self._context.run(self._callback, *self._args)
        File "/asyncexample.py", line 16, in main11
          test()
        test()
      RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    

5 运行日志和阻塞的代码

不应该直接调用阻塞(CPU绑定代码).例如,如果一个函数执行1秒的CPU 密集型计算,那么所有并发任务和IO操作都延迟1秒.

可以用执行器在不同线程 甚至不同进程运行任务,以避免使用事件循环阻塞 OS 线程.
loop.run_in_executor() 了解详情.

  • 日志记录

asyncio使用 logging模块,所有日志记录都是通过asyncio logger执行的
默认日志记录是 logging.INFO,可以很容易调整
logging.getLogger("asyncio").setLevel(logging.WARNING)

6 检测 never-awaited 协同程序

当协程函数被调用,而不是被等待时, 即执行 coro() 而不是 await coro() 或协程没有通过 asyncio.create_task() 被排入计划日程,asyncio将发出一条 RuntimWarning

import asyncio
async def test():
    print("never scheduled")
async def main():
    test()
asyncio.run(main())    
##
    test.py:7: RuntimeWarning:coroutine 'test' was never awaited
        test()

7 抛出异常 用户处理错误,而不是检测到错误退出

如果调用 Future.set_exception() 但不等的Future对象,将异常传播到用户代码.
这种情况看下,当Future对象被垃圾收集时,asyncio将发出一条日志消息.

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

未启用调试模式

python asyncexample.py
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<bug() done, defined at /asyncexample.py:20> exception=Exception('not consumed')>
Traceback (most recent call last):
  File "/asyncexample.py", line 21, in bug
    raise Exception("not consumed")
Exception: not consumed

使用调试模式debug=True,以便跟踪信息

    python asyncexample.py
    Task exception was never retrieved
    future: <Task finished name='Task-2' coro=<bug() done, defined at /asyncexample.py:20> exception=Exception('not consumed') created at /asyncio/tasks.py:361>
    source_traceback: Object created at (most recent call last):
      File "/asyncexample.py", line 30, in <module>
        a = asyncio.run(main(),debug=True)
      File "/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/asyncio/base_events.py", line 629, in run_until_complete
        self.run_forever()
      File "/asyncio/base_events.py", line 596, in run_forever
        self._run_once()
      File "/asyncio/base_events.py", line 1882, in _run_once
        handle._run()
      File "/asyncio/events.py", line 80, in _run
        self._context.run(self._callback, *self._args)
      File "/asyncexample.py", line 24, in main
        asyncio.create_task(bug())
      File "/asyncio/tasks.py", line 361, in create_task
        task = loop.create_task(coro)
    Traceback (most recent call last):
      File "/asyncexample.py", line 21, in bug
        raise Exception("not consumed")
    Exception: not consumed

8 小结

我们这里介绍了 经典异步编程中一些需要理解的同步原语,并且举例实际使用时的步骤,和需要注意的问题。

异步编程可以大大提供程序的响应能力,特别是在实时系统和多个操作的业务中。

本节代码:

 github.com/hahamx/examples/tree/main/alg_practice/1_pys_async
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
15天前
|
JSON 前端开发 JavaScript
在JavaScript中,异步编程是一种处理非阻塞操作(如网络请求、文件读写等)的重要技术
【6月更文挑战第12天】JavaScript中的异步编程通过Promise和async/await处理非阻塞操作。Promise管理异步操作的三种状态,防止回调地狱,支持链式调用和并行处理。async/await是ES8引入的语法糖,使异步代码更像同步代码,提高可读性。两者结合使用能更高效地处理复杂异步场景。
23 3
|
1月前
|
安全 API 调度
异步编程中你需要知道的 指令、响应,同步原语 和 使用时注意事项
【5月更文挑战第1天】`asyncio`提供了多种同步原语,如Lock、Event、Condition、Semaphore和BoundedSemaphore。Lock用于基本同步,Event用于标志状态切换,Condition允许在满足特定条件时唤醒协程,Semaphore控制资源访问,BoundedSemaphore防止计数超过预设值。
88 7
|
1月前
|
JavaScript 前端开发 UED
在 JavaScript 中,异步编程和回调函数是处理非阻塞操作(如网络请求、文件读写等)的重要工具
【5月更文挑战第10天】JavaScript中的异步编程和回调函数用于处理非阻塞操作,提高应用响应性和吞吐量。异步编程通过回调函数、Promises和async/await等方式实现,避免程序因等待操作完成而阻塞。回调函数是异步操作完成后调用的函数,常用于处理网络请求等。然而,回调函数存在嵌套问题和错误处理困难,因此出现了Promises和async/await等更优解决方案。
21 3
|
1月前
|
前端开发 JavaScript UED
由于JavaScript是单线程的,因此在处理大量异步操作时,需要确保不会阻塞UI线程
【5月更文挑战第13天】JavaScript中的Promise和async/await常用于处理游戏开发中的异步操作,如加载资源、网络请求和动画帧更新。Promise表示异步操作的结果,通过.then()和.catch()处理回调。async/await作为Promise的语法糖,使异步代码更简洁,类似同步代码。在游戏循环中,使用async/await可清晰管理资源加载和更新,但需注意避免阻塞UI线程,并妥善处理加载顺序、错误和资源管理,以保证游戏性能和稳定性。
36 3
|
1月前
|
前端开发 JavaScript UED
JavaScript 的事件循环机制是其非阻塞 I/O 模型的核心
【5月更文挑战第9天】JavaScript的事件循环机制是其非阻塞I/O的关键,通过单线程的调用栈和任务队列处理异步任务。当调用栈空时,事件循环从任务队列取出一个任务执行,形成循环。异步操作完成后,回调函数进入任务队列,等待被事件循环处理。微任务如Promise回调在每个宏任务结束后执行。此机制确保JavaScript能高效处理异步操作,不阻塞主线程,提供流畅的用户体验。
24 2
|
1月前
|
Linux 程序员 C++
【C++ 常见的异步机制】探索现代异步编程:从 ASIO 到协程的底层机制解析
【C++ 常见的异步机制】探索现代异步编程:从 ASIO 到协程的底层机制解析
315 2
|
1月前
|
iOS开发
多线程和异步编程:解释 iOS 中的同步和异步任务的概念。
多线程和异步编程:解释 iOS 中的同步和异步任务的概念。
50 1
|
1月前
|
前端开发 JavaScript
异步编程:由于JS是单线程执行的,所以对于耗时的操作(如网络请求),需要通过异步编程来处理。回调函数、Promise、async/await都是常用的异步编程方式。
异步编程:由于JS是单线程执行的,所以对于耗时的操作(如网络请求),需要通过异步编程来处理。回调函数、Promise、async/await都是常用的异步编程方式。
62 1
|
6月前
|
程序员 调度 C#
协程是什么?为何说协程具有同步的编程方式又具有异步的性能?
协程是什么?为何说协程具有同步的编程方式又具有异步的性能?
145 0
|
8月前
|
Java
异步编程 - 02 显式使用线程和线程池实现异步编程
异步编程 - 02 显式使用线程和线程池实现异步编程
42 0