一般涉及异步编程我都无脑用celery,但是最近在做一个项目,项目不大,也不涉及定时任务,所以就用了asyncio。
asyncio是python自带的模块,比celery轻量,使用起来也简单。以前学习过,但是公司项目中碰到并发任务基本都用celery处理,所以没有使用,这次就简单总结一下。
1. 基本概念
asyncio简介
asyncio是Python的标准库,它提供了一种异步编程的框架,可以用于编写并发程序。asyncio使用事件循环来管理异步任务,通过协程(coroutine)来实现异步操作。
协程
协程又叫微线程,是一种特殊的函数,它可以挂起和恢复执行,而不阻塞事件循环。协程使用async和await关键字来定义和调用。
通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行另一个函数。 简而言之,其实就是通过一个线程实现代码块相互切换执行。
事件循环
事件循环是一个持续运行的循环,用于调度和执行任务。它不断地检查是否有任务需要执行,并在任务完成时继续处理其他任务。
2.实际项目场景--开发盘点系统
仓库有多台堆垛机,输入盘点任务,盘点系统需要根据盘点货位号通知堆垛机到达指定位置,堆垛机到达指定货位后通知盘点系统同时进行RFID扫描和视觉比对。
下面将分2个部分介绍asyncio的使用,第一部分是使用asyncio实现并发网络请求,如图中①所示,第二部分是使用asyncio实现异步处理长时间的耗时任务,如图中②所示
3.asyncio实现并发网络请求
asyncio实现并发网络请求,需要使用asyncio的异步网络请求库aiohttp。aiohttp是一个基于asyncio的异步HTTP客户端/服务器库,可以用于发送HTTP请求和创建HTTP服务器。
示例如下,发送请求通知堆垛机到达指定货位,请求地址相同,但是请求参数(货位号)不同,使用asyncio实现并发请求,节省盘点时间。
import asyncio import datetime import aiohttp async def async_post(url, data,headers): async with aiohttp.ClientSession() as session: async with session.post(url, json=data,headers=headers) as response: return await response.json() async def execute_queue(payloads): url = 'http://127.0.0.1:8011/api/smart_check/execute_queue/' headers = {"content-type": "application/json"} tasks = [] for payload in payloads: task = asyncio.create_task(async_post(url, payload,headers)) tasks.append(task) responses = await asyncio.gather(*tasks) for response in responses: #打印响应结果和时间 print(response,datetime.datetime.now()) # Run the asyncio event loop if __name__ == "__main__": payloads = [ {'space_number': '1号'}, # 货位号 {'space_number': '8号'}, # 货位号 {'space_number': '10号'} # 货位号 ] #loop = asyncio.get_event_loop() #loop.run_until_complete(result) asyncio.run(execute_queue(payloads))# python3.7后支持,等同于上面两行代码 """==========返回的结果===========""" {'code': 200, 'data': None, 'message': '堆垛机已到达货位1号'} 2024-07-09 15:47:06.778444 {'code': 200, 'data': None, 'message': '堆垛机已到达货位8号'} 2024-07-09 15:47:06.778444 {'code': 200, 'data': None, 'message': '堆垛机已到达货位10号'} 2024-07-09 15:47:06.778444 """=============================="""
4.asyncio实现异步处理长时间的耗时任务
4.1 使用asyncios实现
很多时候耗时任务根本没有aiohttp这样的模块帮我们实现,所以可以用 run_in_executor
将耗时任务放入一个线程池执行器中运行,以避免阻塞事件循环所在的线程。简单理解就是调用新的线程去执行异步任务。demo如下:
import time import asyncio def task(): #某个耗时操作任务 time.sleep(2) return "ok" async def main(): loop = asyncio.get_event_loop() # 如果第一个参数不填,默认会使用一个 `ThreadPoolExecutor`,它会在一个线程池中执行函数,并返回返回一个Future对象,而Future就是一个可等待对象 res = loop.run_in_executor(None,task) result = await res print("default thread pool",result) asyncio.run(main())
实际开发中:
import time import asyncio def rfid(): time.sleep(2) return "读取RFID信息" def capture(): time.sleep(6) return "摄像头抓图" async def main(): loop = asyncio.get_event_loop() tasks = [loop.run_in_executor(None,rfid),loop.run_in_executor(None,capture)] res = await asyncio.gather(*tasks) print(res) # 判断有没有待盘点的货位,如果有就继续通知堆垛机,此处省略--- # payloads = [ # {'space_number': '1号'}, # 货位号 #] #asyncio.run(execute_queue(payloads)) if __name__ == "__main__": asyncio.run(main())
最后打印的结果是['读取RFID信息', '摄像头抓图']
。asyncio会保证两个异步任务都结束后才会继续执行execute_queue
函数,就是通知堆垛机处理剩余待盘点的货位。
4.2 使用celery实现
如果用celery则是需要用group做处理,才能保证这两个异步任务都结束,而且是并发执行,因为会同时分配给两个worker去执行。例子如下:
from celery import group # 定义多个任务 task .def rfid(): time.sleep(2) return "读取RFID信息" task .def capture(): time.sleep(6) return "摄像头抓图" # 将任务组合成一个 group job_group = group(rfid.s(), capture.s()) # 执行 group 中的任务 result = job_group.apply_async() # 等待所有任务完成并获取结果 final_result = result.get() print(final_result) # 输出所有任务的结果
5. 总结
celery和asyncio写代码都差不多,但asycio用起来更简单,更适用于网络并发请求。如果用于做耗时任务处理也可以,针对如果耗时任务只有一个,明显用celery把耗时任务转到后台处理更为合适。