简介
Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,Celery架构如下图,由消息队列、任务执行单元、结果存储三部分组成。
user:任务的生产者,可以是用户(触发任务)或者celery beat(产生周期任务)
broker:消息中间件,Redis或RabbitMQ,生产者产生的任务会先存放到broker
worker:任务执行单元,执行broker中的任务
store(backend):存储任务结果
安装
pip install -U Celery
目录结构
使用下面简单的目录结构做演示,正式项目中的使用一般为多目录结构,不同的任务放到不同的task文件。
proj
- task.py
- app.py
- result.py
基本使用
创建任务
# task.py
import celery
broker = 'redis://127.0.0.1:6379/1' # redis://:password@127.0.0.1:6379/1
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('test', backend=backend, broker=broker)
# app = celery.Celery('test') # 效果同上
# app.conf.broker_url = broker
# app.conf.result_backend = backend
@app.task
def add(a, b):
print(f'{a}+{b} = {a + b}')
return a + b
• 实例化Celery对象
• 定义函数
• Celery对象task方法作为装饰器
如果要使用RabbitMQ作为消息中间件,只需修改broker,无需关心如何操作Redis或RabbitMQ。
调用任务
# app.py
from task import add
result = add.delay(1, 2)
print(result.id) # 2a3d96fe-c3e9-4846-8237-24fc28d9ad2b
task装饰器返回一个celery Task对象,赋予了原函数Task的方法,delay方法用于调用异步任务,异步任务返回celery.result.AsyncResult对象,在执行任务的时候最主要的就是获取ID,以后可以用ID去查任务执行状态、结果等。
任务状态
# result.py
from celery.result import AsyncResult
from task import app
async_result = AsyncResult(
id='2a3d96fe-c3e9-4846-8237-24fc28d9ad2b', app=app
)
print(async_result.status)
print(async_result.result)
print(async_result.get())
• status:任务执行状态(PENDING、STARTED、RETRY、FAILURE、SUCCESS)
• result:任务的返回值或错误信息
• get():同步的方式查结果
以上所有操作基于broker能连接上
执行add.delay后去查任务状态,一直处于PENDING,因为worker没启动,任务就存放在broker中一直没有被执行。
broker
broker选择的是Redis的数据库1,前面触发的任务存储在key为celery的list中。
启动worker
celery worker -A task -l info -P eventlet
- -A:指定Celery对象的位置
- -l:日志级别
- -P:默认使用prefork管理并发,windows不支持prefork
worker启动后,可以看到部分配置信息、队列、任务,然后就会执行broker中堆积的任务,并将结果保存到backend
- ** ---------- [config]
- ** ---------- .> app: test:0x1ade6ea95f8
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. task.add
...
[2020-08-11 01:01:02,616: INFO/MainProcess] Received task: task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b]
[2020-08-11 01:01:08,796: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:01:08,801: INFO/MainProcess] Task task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b] succeeded in 6.172000000002299: 3
值得注意的是tasks,列表展示所有的celery任务,后面celery-beat还会用到。
[tasks]
. task.add
backend
到Redis查看执行结果
监控
监控使用flower,自带可视化web界面,使用pip安装
pip install -U flower
启动命令:
celery -A task flower (--port=5555)
或
celery flower --broker=redis://127.0.0.1:6379/1
重试
可以设置任务执行失败后是否进行重试,对哪些错误进行重试,重试次数、时间间隔等。
# task
@app.task(
autoretry_for=(Exception,), # 指定错误码,Exception表示对所有错误进行重试
max_retries=2, # 重试次数
retry_backoff=4, # 重试时间间隔,retry_jitter为True时,时间间隔为1-retry_backoff之间随机数
# retry_jitter=False, # 默认为True,retry_jitter=False时,第n次重试时间为上一次重试时间retry_backoff**n秒后
)
def send_msg(msg):
return msg[5]
# app
res1 = send_msg.delay('abcdef')
res2 = send_msg.delay('abc')
res1正常执行,res2 IndexError重试
定时任务
普通函数使用task装饰器后被封装成celery任务,可以z作为异步任务调用,也可以作为定时任务调用,具体看调用方式。
调用定时任务的两种方式:
countdown
add.apply_async(args=(1, 2), countdown=3, expires=5)
• args:函数的参数
• countdown:几秒后执行
• expires:过期时间
eta
eta:datetime、utc时间,可以使用timedelta做时间运算,设置时间上更为灵活。
add.apply_async(
args=(1, 2),
eta=datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
expires=20
)
周期任务:celery beat
如果定义了beat_schedule,在启动celery-beat后就会周期性的产生任务放到broker。
# task.py
app.conf.beat_schedule = {
'test_cycle_task': { # 任务name
# 执行task下的add函数
'task': 'task.add', # 启动worker时监控到的任务 -> [tasks]
# 'schedule': 5.0, # 几秒执行一次
'schedule': timedelta(seconds=6), # 多久执行一次
# 'schedule': crontab(hour=0, minute=55), # 每天定时执行
'args': (2, 3) # 传递参数
},
'task2': {}
}
celery-beat启动命令:
celery beat -A task
子任务
任务执行成功或失败后执行一个回调函数。
task1.apply_async((1, 2), link=task2.s(3), link_error=task3.s())
task1执行成功,返回值传递给task2并且作为task2的第一个参数
task1出错,ID传递给task3并且作为task3的第一个参数
执行:add.apply_async((1, 2), link=add.s(3))
[2020-08-11 01:42:18,592: INFO/MainProcess] Received task: task.add[a7be35fd-c932-46e1-970d-0c520caecc32]
[2020-08-11 01:42:18,598: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:42:18,609: INFO/MainProcess] Received task: task.add[2e72a107-e833-4da4-a532-90e6039cc32e]
[2020-08-11 01:42:18,614: INFO/MainProcess] Task task.add[a7be35fd-c932-46e1-970d-0c520caecc32] succeeded in 0.01499999
9999417923s: 3
[2020-08-11 01:42:18,618: WARNING/MainProcess] 3+3 = 6
[2020-08-11 01:42:18,625: INFO/MainProcess] Task task.add[2e72a107-e833-4da4-a532-90e6039cc32e] succeeded in 0.0s: 6