Celery 是一个强大的异步任务队列/作业队列,基于分布式消息传递。它专注于实时操作,但也支持任务调度。以下是如何使用 Celery 的基本步骤和代码示例:
安装 Celery
首先,使用 pip 安装 Celery:
pip install celery
创建 Celery 实例
创建一个 Celery 实例,并指定消息代理(broker)和结果后端(backend)。
from celery import Celery
app = Celery(
'my_app',
broker='redis://localhost:6379/0', # 消息代理
backend='redis://localhost:6379/0', # 结果后端
)
定义任务
使用 @app.task
装饰器定义一个任务。
@app.task
def add(x, y):
return x + y
启动 Worker
在命令行中启动 Celery worker 来执行任务。
celery -A my_app worker --loglevel=info
调用任务
使用 delay
方法异步发送任务到 Celery。
result = add.delay(4, 4)
获取任务结果
使用返回的 AsyncResult
对象获取任务结果。
from celery.result import AsyncResult
task_id = result.id
async_result = AsyncResult(task_id, app=app)
print(async_result.get()) # 获取任务结果
定时任务
Celery 还支持定时任务,可以使用定时器(beat)来调度周期性任务。
from celery.schedules import crontab
app.conf.beat_schedule = {
'run-every-5-minutes': {
'task': 'my_app.add',
'schedule': 300.0, # 5分钟执行一次
},
}
代码示例
以下是一个完整的 Celery 任务示例:
```python
from celery import Celery
from celery.schedules import crontab
创建 Celery 实例
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
定义任务
@app.task
def add(x, y):
return x + y
定义周期性任务
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'my_app.add',
'schedule': 30.0, # 30秒执行一次
'args': (16, 16),
},
}