Celery的第一步
Celery时一个自带电池的任务队列。
本教程内容:
- 安装消息传输代理(broker)
- 安装Celery并创建第一个任务(task)
- 启动Celery工作进程(worker)并执行任务
- 追踪任务的状态
选择Broker
Celery需要一个方法来发送和接受消息,这个方法被称为消息代理(message broker)。
Celery支持多种消息代理,如RabbitMQ、Redis等。
安装RabbitMQ
:
(推荐)在Dockers上运行RabbitMQ:
docker run -d -p 5672:5672 rabbitmq
或者在Ubuntu上安装RabbitMQ:
sudo apt-get install rabbitmq-server
运行Celery worker server
celery -A tasks worker --loglevel=INFO
Windows下有个坑:celery正常启动和接收任务但不能执行,报错:ValueError: not enough values to unpack (expected 3, got 0)。
需要借助eventlet:
1.安装eventlet: pip install eventlet
2.借助eventlet启动celery: celery -A tasks worker --loglevel=INFO -P eventlet
参考1:https://www.cnblogs.com/qumogu/p/13284173.html
参考2:https://stackoverflow.com/questions/37255548/how-to-run-celery-on-windows
但这只是一个临时解决方案, celery对windows的支持很差,最好还是在Linux下运行。windows系统可以用WSL。
调用task
使用delay()
方法调用task:
在Python shell中:
from tasks import add add.delay(4, 4)
注:delay()
方法是apply_async()
方法的快捷方式。
然后,之前启动的worker进程会执行这个任务。可以在worker进程的日志中看到任务的执行情况:
[2024-04-10 21:58:25,217: INFO/MainProcess] Task tasks.add[987d2e18-0090-4b5b-bcb5-bd038b9690a3] received [2024-04-10 21:58:25,221: INFO/MainProcess] Task tasks.add[987d2e18-0090-4b5b-bcb5-bd038b9690a3] succeeded in 0.0s: 8
保留结果
如果要跟踪任务的状态, Celery需要将状态存储或发送到某个地方,如SQLAlchemy/Django ORM、MongoDB、Memcached、Redis、RPC(RabbitMQ/AMQP),并且可以自定义。
在此示例中,我们使用 rpc作为结果后端(result backend),它将状态作为暂时性消息发送回。Celery通过 backend参数 指定后端(如果选择使用配置模块,则通过result_backend设置指定)。因此,您可以在 tasks.py 文件中修改此行以启用 rpc:// 后端:
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
或者,如果您想使用 Redis 作为结果后端,但仍然使用 RabbitMQ 作为消息代理(一种流行的组合):
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
现在配置了结果后端,关闭当前 python 会话并再次导入 tasks 模块以使更改生效。这一次,您将保留调用任务时返回的 AsyncResult 实例:
from tasks import add result = add.delay(4, 4)
然后可以用ready()
方法检查任务是否完成:
result.ready()
您可以等待结果完成,但很少使用,因为这会将异步调用转换为同步调用:
result.get(timeout=1) 8
Configuration
Celery就像家用电器一样,不需要太多配置。 只需要配置输入(连接到代理 broker)和输出(连接到结果后端)即可使用。但是,如果你仔细观察,你会发现有很多按钮。这就是配置选项。
默认的配置通常是足够的,但是也可以通过修改配置让Celery更适合你的需求。
可以直接在app上修改配置:
app.conf.task_serializer = 'json'
如果一次性修改多个配置,可以使用update
方法:
app.conf.update( task_serializer='json', accept_content=['json'], # Ignore other content result_serializer='json', timezone='Europe/Oslo', enable_utc=True, )
对于较大的项目,建议使用专用的配置模块。可以用app.config_from_object()
告诉 Celery 使用配置模块:
app.config_from_object('celeryconfig')
配置模块名称通常是celeryconfig
。
该模块必须在当前目录可以访问,
celeryconfig.py
:
broker_url = 'pyamqp://' result_backend = 'rpc://' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True
要验证配置文件是否正常工作且不包含任何语法错误,可以尝试导入它:
python -m celeryconfig
下面是两个配置示例:
将行为异常的任务路由到专用队列的方式
task_routes = { 'tasks.add': 'low-priority', }
对任务进行速率限制
task_annotations = { 'tasks.add': {'rate_limit': '10/m'} }