Celery基本使用-阿里云开发者社区

开发者社区> 数据库> 正文

Celery基本使用

简介: Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息、调度异步任务,并且提供了一整套操作此系统的一系列工具。

简介

Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,Celery架构如下图,由消息队列、任务执行单元、结果存储三部分组成。

image.png

user:任务的生产者,可以是用户(触发任务)或者celery beat(产生周期任务)
broker:消息中间件,Redis或RabbitMQ,生产者产生的任务会先存放到broker
worker:任务执行单元,执行broker中的任务
store(backend):存储任务结果

安装

pip install -U Celery

目录结构

使用下面简单的目录结构做演示,正式项目中的使用一般为多目录结构,不同的任务放到不同的task文件。

proj
  - task.py
  - app.py
  - result.py

基本使用

创建任务

# task.py

import celery

broker = 'redis://127.0.0.1:6379/1'  # redis://:password@127.0.0.1:6379/1
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('test', backend=backend, broker=broker)

# app = celery.Celery('test')  # 效果同上
# app.conf.broker_url = broker
# app.conf.result_backend = backend

@app.task
def add(a, b):
    print(f'{a}+{b} = {a + b}')
    return a + b

• 实例化Celery对象
• 定义函数
• Celery对象task方法作为装饰器
如果要使用RabbitMQ作为消息中间件,只需修改broker,无需关心如何操作Redis或RabbitMQ。

调用任务

# app.py

from task import add

result = add.delay(1, 2)
print(result.id)  # 2a3d96fe-c3e9-4846-8237-24fc28d9ad2b

task装饰器返回一个celery Task对象,赋予了原函数Task的方法,delay方法用于调用异步任务,异步任务返回celery.result.AsyncResult对象,在执行任务的时候最主要的就是获取ID,以后可以用ID去查任务执行状态、结果等。

任务状态

# result.py

from celery.result import AsyncResult
from task import app

async_result = AsyncResult(
    id='2a3d96fe-c3e9-4846-8237-24fc28d9ad2b', app=app
)
print(async_result.status)
print(async_result.result)
print(async_result.get())

• status:任务执行状态(PENDING、STARTED、RETRY、FAILURE、SUCCESS)
• result:任务的返回值或错误信息
• get():同步的方式查结果

以上所有操作基于broker能连接上
执行add.delay后去查任务状态,一直处于PENDING,因为worker没启动,任务就存放在broker中一直没有被执行。

broker

broker选择的是Redis的数据库1,前面触发的任务存储在key为celery的list中。

image.png

启动worker

celery worker -A task -l info -P eventlet
  • -A:指定Celery对象的位置
  • -l:日志级别
  • -P:默认使用prefork管理并发,windows不支持prefork

worker启动后,可以看到部分配置信息、队列、任务,然后就会执行broker中堆积的任务,并将结果保存到backend

- ** ---------- [config]
- ** ---------- .> app:         test:0x1ade6ea95f8
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . task.add

...

[2020-08-11 01:01:02,616: INFO/MainProcess] Received task: task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b]
[2020-08-11 01:01:08,796: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:01:08,801: INFO/MainProcess] Task task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b] succeeded in 6.172000000002299: 3

值得注意的是tasks,列表展示所有的celery任务,后面celery-beat还会用到。

[tasks]
  . task.add

backend

到Redis查看执行结果

image.png

监控

监控使用flower,自带可视化web界面,使用pip安装

pip install -U flower

启动命令:

celery -A task flower (--port=5555) 
或
celery flower --broker=redis://127.0.0.1:6379/1

image.png

image.png

重试

可以设置任务执行失败后是否进行重试,对哪些错误进行重试,重试次数、时间间隔等。

# task

@app.task(
    autoretry_for=(Exception,),  # 指定错误码,Exception表示对所有错误进行重试
    max_retries=2,  # 重试次数
    retry_backoff=4,  # 重试时间间隔,retry_jitter为True时,时间间隔为1-retry_backoff之间随机数
    # retry_jitter=False,  # 默认为True,retry_jitter=False时,第n次重试时间为上一次重试时间retry_backoff**n秒后
)
def send_msg(msg):
    return msg[5]
# app

res1 = send_msg.delay('abcdef')
res2 = send_msg.delay('abc')

res1正常执行,res2 IndexError重试

image.png

定时任务

普通函数使用task装饰器后被封装成celery任务,可以z作为异步任务调用,也可以作为定时任务调用,具体看调用方式。
调用定时任务的两种方式:

countdown

add.apply_async(args=(1, 2), countdown=3, expires=5)

• args:函数的参数
• countdown:几秒后执行
• expires:过期时间

eta

eta:datetime、utc时间,可以使用timedelta做时间运算,设置时间上更为灵活。

add.apply_async(
    args=(1, 2), 
    eta=datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
    expires=20
)

周期任务:celery beat

如果定义了beat_schedule,在启动celery-beat后就会周期性的产生任务放到broker。

# task.py
app.conf.beat_schedule = {
    'test_cycle_task': {  # 任务name
        # 执行task下的add函数
        'task': 'task.add',  # 启动worker时监控到的任务 -> [tasks]
        # 'schedule': 5.0,  # 几秒执行一次
        'schedule': timedelta(seconds=6),  # 多久执行一次
        # 'schedule': crontab(hour=0, minute=55),  # 每天定时执行
        'args': (2, 3)  # 传递参数
    },
    'task2': {}
}

celery-beat启动命令:

celery beat -A task

子任务

任务执行成功或失败后执行一个回调函数。

task1.apply_async((1, 2), link=task2.s(3), link_error=task3.s())
task1执行成功,返回值传递给task2并且作为task2的第一个参数
task1出错,ID传递给task3并且作为task3的第一个参数

执行:add.apply_async((1, 2), link=add.s(3))
[2020-08-11 01:42:18,592: INFO/MainProcess] Received task: task.add[a7be35fd-c932-46e1-970d-0c520caecc32]
[2020-08-11 01:42:18,598: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:42:18,609: INFO/MainProcess] Received task: task.add[2e72a107-e833-4da4-a532-90e6039cc32e]
[2020-08-11 01:42:18,614: INFO/MainProcess] Task task.add[a7be35fd-c932-46e1-970d-0c520caecc32] succeeded in 0.01499999
9999417923s: 3
[2020-08-11 01:42:18,618: WARNING/MainProcess] 3+3 = 6
[2020-08-11 01:42:18,625: INFO/MainProcess] Task task.add[2e72a107-e833-4da4-a532-90e6039cc32e] succeeded in 0.0s: 6

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

其他文章