Celery基本使用

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息、调度异步任务,并且提供了一整套操作此系统的一系列工具。

简介

Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,Celery架构如下图,由消息队列、任务执行单元、结果存储三部分组成。

image.png

user:任务的生产者,可以是用户(触发任务)或者celery beat(产生周期任务)
broker:消息中间件,Redis或RabbitMQ,生产者产生的任务会先存放到broker
worker:任务执行单元,执行broker中的任务
store(backend):存储任务结果

安装

pip install -U Celery

目录结构

使用下面简单的目录结构做演示,正式项目中的使用一般为多目录结构,不同的任务放到不同的task文件。

proj
  - task.py
  - app.py
  - result.py

基本使用

创建任务

# task.py

import celery

broker = 'redis://127.0.0.1:6379/1'  # redis://:password@127.0.0.1:6379/1
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('test', backend=backend, broker=broker)

# app = celery.Celery('test')  # 效果同上
# app.conf.broker_url = broker
# app.conf.result_backend = backend

@app.task
def add(a, b):
    print(f'{a}+{b} = {a + b}')
    return a + b

• 实例化Celery对象
• 定义函数
• Celery对象task方法作为装饰器
如果要使用RabbitMQ作为消息中间件,只需修改broker,无需关心如何操作Redis或RabbitMQ。

调用任务

# app.py

from task import add

result = add.delay(1, 2)
print(result.id)  # 2a3d96fe-c3e9-4846-8237-24fc28d9ad2b

task装饰器返回一个celery Task对象,赋予了原函数Task的方法,delay方法用于调用异步任务,异步任务返回celery.result.AsyncResult对象,在执行任务的时候最主要的就是获取ID,以后可以用ID去查任务执行状态、结果等。

任务状态

# result.py

from celery.result import AsyncResult
from task import app

async_result = AsyncResult(
    id='2a3d96fe-c3e9-4846-8237-24fc28d9ad2b', app=app
)
print(async_result.status)
print(async_result.result)
print(async_result.get())

• status:任务执行状态(PENDING、STARTED、RETRY、FAILURE、SUCCESS)
• result:任务的返回值或错误信息
• get():同步的方式查结果

以上所有操作基于broker能连接上
执行add.delay后去查任务状态,一直处于PENDING,因为worker没启动,任务就存放在broker中一直没有被执行。

broker

broker选择的是Redis的数据库1,前面触发的任务存储在key为celery的list中。

image.png

启动worker

celery worker -A task -l info -P eventlet
  • -A:指定Celery对象的位置
  • -l:日志级别
  • -P:默认使用prefork管理并发,windows不支持prefork

worker启动后,可以看到部分配置信息、队列、任务,然后就会执行broker中堆积的任务,并将结果保存到backend

- ** ---------- [config]
- ** ---------- .> app:         test:0x1ade6ea95f8
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . task.add

...

[2020-08-11 01:01:02,616: INFO/MainProcess] Received task: task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b]
[2020-08-11 01:01:08,796: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:01:08,801: INFO/MainProcess] Task task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b] succeeded in 6.172000000002299: 3

值得注意的是tasks,列表展示所有的celery任务,后面celery-beat还会用到。

[tasks]
  . task.add

backend

到Redis查看执行结果

image.png

监控

监控使用flower,自带可视化web界面,使用pip安装

pip install -U flower

启动命令:

celery -A task flower (--port=5555) 
或
celery flower --broker=redis://127.0.0.1:6379/1

image.png

image.png

重试

可以设置任务执行失败后是否进行重试,对哪些错误进行重试,重试次数、时间间隔等。

# task

@app.task(
    autoretry_for=(Exception,),  # 指定错误码,Exception表示对所有错误进行重试
    max_retries=2,  # 重试次数
    retry_backoff=4,  # 重试时间间隔,retry_jitter为True时,时间间隔为1-retry_backoff之间随机数
    # retry_jitter=False,  # 默认为True,retry_jitter=False时,第n次重试时间为上一次重试时间retry_backoff**n秒后
)
def send_msg(msg):
    return msg[5]
# app

res1 = send_msg.delay('abcdef')
res2 = send_msg.delay('abc')

res1正常执行,res2 IndexError重试

image.png

定时任务

普通函数使用task装饰器后被封装成celery任务,可以z作为异步任务调用,也可以作为定时任务调用,具体看调用方式。
调用定时任务的两种方式:

countdown

add.apply_async(args=(1, 2), countdown=3, expires=5)

• args:函数的参数
• countdown:几秒后执行
• expires:过期时间

eta

eta:datetime、utc时间,可以使用timedelta做时间运算,设置时间上更为灵活。

add.apply_async(
    args=(1, 2), 
    eta=datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
    expires=20
)

周期任务:celery beat

如果定义了beat_schedule,在启动celery-beat后就会周期性的产生任务放到broker。

# task.py
app.conf.beat_schedule = {
    'test_cycle_task': {  # 任务name
        # 执行task下的add函数
        'task': 'task.add',  # 启动worker时监控到的任务 -> [tasks]
        # 'schedule': 5.0,  # 几秒执行一次
        'schedule': timedelta(seconds=6),  # 多久执行一次
        # 'schedule': crontab(hour=0, minute=55),  # 每天定时执行
        'args': (2, 3)  # 传递参数
    },
    'task2': {}
}

celery-beat启动命令:

celery beat -A task

子任务

任务执行成功或失败后执行一个回调函数。

task1.apply_async((1, 2), link=task2.s(3), link_error=task3.s())
task1执行成功,返回值传递给task2并且作为task2的第一个参数
task1出错,ID传递给task3并且作为task3的第一个参数

执行:add.apply_async((1, 2), link=add.s(3))
[2020-08-11 01:42:18,592: INFO/MainProcess] Received task: task.add[a7be35fd-c932-46e1-970d-0c520caecc32]
[2020-08-11 01:42:18,598: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:42:18,609: INFO/MainProcess] Received task: task.add[2e72a107-e833-4da4-a532-90e6039cc32e]
[2020-08-11 01:42:18,614: INFO/MainProcess] Task task.add[a7be35fd-c932-46e1-970d-0c520caecc32] succeeded in 0.01499999
9999417923s: 3
[2020-08-11 01:42:18,618: WARNING/MainProcess] 3+3 = 6
[2020-08-11 01:42:18,625: INFO/MainProcess] Task task.add[2e72a107-e833-4da4-a532-90e6039cc32e] succeeded in 0.0s: 6

目录
相关文章
|
5月前
|
监控 NoSQL 数据可视化
Django+Celery 进阶:Flower可视化监控与排错
本文介绍了Celery命令行工具与图形监控工具的使用,涵盖查看Worker状态、任务信息及集成至Django项目的方法,同时提供Redis监控与常见问题排错方案。
440 1
|
设计模式 算法 安全
一文带你通俗理解23种软件设计模式(推荐收藏,适合小白学习,附带C++例程完整源码)
一文带你通俗理解23种软件设计模式(推荐收藏,适合小白学习,附带C++例程完整源码)
2217 0
|
3月前
|
人工智能 自然语言处理 机器人
使用 API 编程开发扣子应用
扣子(Coze)应用支持通过 API 编程,将 AI 聊天、内容生成、工作流自动化等功能集成至自有系统。主要 API 包括 Bot API(用于消息交互与会话管理)及插件与知识库 API(扩展功能与数据管理)。开发流程包括创建应用、获取密钥、调用 API 并处理响应,支持 Python 等语言。建议加强错误处理、密钥安全与会话管理,提升集成灵活性与应用扩展性。
1005 0
|
10月前
|
人工智能 关系型数据库 分布式数据库
2025阿里云PolarDB开发者大会来了!
在数字化浪潮中,AI与数据库的融合正重塑行业格局。2025年2月26日(周三),诚邀您在北京朝阳区嘉瑞文化中心参会,探讨数据技术发展与AI时代的无限可能。线上直播同步进行,欢迎参与!
2025阿里云PolarDB开发者大会来了!
|
机器学习/深度学习 人工智能 自然语言处理
软件测试中的人工智能革命:现状与未来展望
【10月更文挑战第2天】 本文深入探讨了人工智能在软件测试领域的应用现状、面临的挑战以及未来的发展方向。通过分析AI技术如何提高测试效率、准确性和自动化水平,文章揭示了AI在改变传统软件测试模式中的关键作用。同时,指出了当前AI测试工具的局限性,并对未来AI与软件测试深度融合的前景进行了展望,强调了技术创新对于提升软件质量的重要性。
446 4
|
NoSQL 调度 Redis
Celery
【10月更文挑战第10天】
348 4
|
存储 安全 算法
加盐哈希的科学原理及其重要性
【8月更文挑战第31天】
758 0
|
分布式计算 算法 Java
阿里云ODPS PySpark任务使用mmlspark/synapseml运行LightGBM进行Boosting算法的高效训练与推理
阿里云ODPS PySpark任务使用mmlspark/synapseml运行LightGBM进行Boosting算法的高效训练与推理
1289 3
|
Linux 网络安全 UED
在Linux中, 什么是命令行界面(CLI)和图形用户界面(GUI)?
在Linux中, 什么是命令行界面(CLI)和图形用户界面(GUI)?