Celery基本使用

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息、调度异步任务,并且提供了一整套操作此系统的一系列工具。

简介

Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,Celery架构如下图,由消息队列、任务执行单元、结果存储三部分组成。

image.png

user:任务的生产者,可以是用户(触发任务)或者celery beat(产生周期任务)
broker:消息中间件,Redis或RabbitMQ,生产者产生的任务会先存放到broker
worker:任务执行单元,执行broker中的任务
store(backend):存储任务结果

安装

pip install -U Celery

目录结构

使用下面简单的目录结构做演示,正式项目中的使用一般为多目录结构,不同的任务放到不同的task文件。

proj
  - task.py
  - app.py
  - result.py

基本使用

创建任务

# task.py

import celery

broker = 'redis://127.0.0.1:6379/1'  # redis://:password@127.0.0.1:6379/1
backend = 'redis://127.0.0.1:6379/2'
app = celery.Celery('test', backend=backend, broker=broker)

# app = celery.Celery('test')  # 效果同上
# app.conf.broker_url = broker
# app.conf.result_backend = backend

@app.task
def add(a, b):
    print(f'{a}+{b} = {a + b}')
    return a + b

• 实例化Celery对象
• 定义函数
• Celery对象task方法作为装饰器
如果要使用RabbitMQ作为消息中间件,只需修改broker,无需关心如何操作Redis或RabbitMQ。

调用任务

# app.py

from task import add

result = add.delay(1, 2)
print(result.id)  # 2a3d96fe-c3e9-4846-8237-24fc28d9ad2b

task装饰器返回一个celery Task对象,赋予了原函数Task的方法,delay方法用于调用异步任务,异步任务返回celery.result.AsyncResult对象,在执行任务的时候最主要的就是获取ID,以后可以用ID去查任务执行状态、结果等。

任务状态

# result.py

from celery.result import AsyncResult
from task import app

async_result = AsyncResult(
    id='2a3d96fe-c3e9-4846-8237-24fc28d9ad2b', app=app
)
print(async_result.status)
print(async_result.result)
print(async_result.get())

• status:任务执行状态(PENDING、STARTED、RETRY、FAILURE、SUCCESS)
• result:任务的返回值或错误信息
• get():同步的方式查结果

以上所有操作基于broker能连接上
执行add.delay后去查任务状态,一直处于PENDING,因为worker没启动,任务就存放在broker中一直没有被执行。

broker

broker选择的是Redis的数据库1,前面触发的任务存储在key为celery的list中。

image.png

启动worker

celery worker -A task -l info -P eventlet
  • -A:指定Celery对象的位置
  • -l:日志级别
  • -P:默认使用prefork管理并发,windows不支持prefork

worker启动后,可以看到部分配置信息、队列、任务,然后就会执行broker中堆积的任务,并将结果保存到backend

- ** ---------- [config]
- ** ---------- .> app:         test:0x1ade6ea95f8
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . task.add

...

[2020-08-11 01:01:02,616: INFO/MainProcess] Received task: task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b]
[2020-08-11 01:01:08,796: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:01:08,801: INFO/MainProcess] Task task.add[2a3d96fe-c3e9-4846-8237-24fc28d9ad2b] succeeded in 6.172000000002299: 3

值得注意的是tasks,列表展示所有的celery任务,后面celery-beat还会用到。

[tasks]
  . task.add

backend

到Redis查看执行结果

image.png

监控

监控使用flower,自带可视化web界面,使用pip安装

pip install -U flower

启动命令:

celery -A task flower (--port=5555) 
或
celery flower --broker=redis://127.0.0.1:6379/1

image.png

image.png

重试

可以设置任务执行失败后是否进行重试,对哪些错误进行重试,重试次数、时间间隔等。

# task

@app.task(
    autoretry_for=(Exception,),  # 指定错误码,Exception表示对所有错误进行重试
    max_retries=2,  # 重试次数
    retry_backoff=4,  # 重试时间间隔,retry_jitter为True时,时间间隔为1-retry_backoff之间随机数
    # retry_jitter=False,  # 默认为True,retry_jitter=False时,第n次重试时间为上一次重试时间retry_backoff**n秒后
)
def send_msg(msg):
    return msg[5]
# app

res1 = send_msg.delay('abcdef')
res2 = send_msg.delay('abc')

res1正常执行,res2 IndexError重试

image.png

定时任务

普通函数使用task装饰器后被封装成celery任务,可以z作为异步任务调用,也可以作为定时任务调用,具体看调用方式。
调用定时任务的两种方式:

countdown

add.apply_async(args=(1, 2), countdown=3, expires=5)

• args:函数的参数
• countdown:几秒后执行
• expires:过期时间

eta

eta:datetime、utc时间,可以使用timedelta做时间运算,设置时间上更为灵活。

add.apply_async(
    args=(1, 2), 
    eta=datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
    expires=20
)

周期任务:celery beat

如果定义了beat_schedule,在启动celery-beat后就会周期性的产生任务放到broker。

# task.py
app.conf.beat_schedule = {
    'test_cycle_task': {  # 任务name
        # 执行task下的add函数
        'task': 'task.add',  # 启动worker时监控到的任务 -> [tasks]
        # 'schedule': 5.0,  # 几秒执行一次
        'schedule': timedelta(seconds=6),  # 多久执行一次
        # 'schedule': crontab(hour=0, minute=55),  # 每天定时执行
        'args': (2, 3)  # 传递参数
    },
    'task2': {}
}

celery-beat启动命令:

celery beat -A task

子任务

任务执行成功或失败后执行一个回调函数。

task1.apply_async((1, 2), link=task2.s(3), link_error=task3.s())
task1执行成功,返回值传递给task2并且作为task2的第一个参数
task1出错,ID传递给task3并且作为task3的第一个参数

执行:add.apply_async((1, 2), link=add.s(3))
[2020-08-11 01:42:18,592: INFO/MainProcess] Received task: task.add[a7be35fd-c932-46e1-970d-0c520caecc32]
[2020-08-11 01:42:18,598: WARNING/MainProcess] 1+2 = 3
[2020-08-11 01:42:18,609: INFO/MainProcess] Received task: task.add[2e72a107-e833-4da4-a532-90e6039cc32e]
[2020-08-11 01:42:18,614: INFO/MainProcess] Task task.add[a7be35fd-c932-46e1-970d-0c520caecc32] succeeded in 0.01499999
9999417923s: 3
[2020-08-11 01:42:18,618: WARNING/MainProcess] 3+3 = 6
[2020-08-11 01:42:18,625: INFO/MainProcess] Task task.add[2e72a107-e833-4da4-a532-90e6039cc32e] succeeded in 0.0s: 6

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
18天前
|
监控 NoSQL 测试技术
python使用Flask,Redis和Celery的异步任务
python使用Flask,Redis和Celery的异步任务
|
消息中间件 监控 NoSQL
python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
在之前的一篇文章中提到了用[Django+Celery+Redis实现了异步任务队列](https://v3u.cn/a_id_54),只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。
python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
|
监控 Python
Python编程:Django中使用Celery执行异步任务和定时任务
Python编程:Django中使用Celery执行异步任务和定时任务
186 0
|
消息中间件 存储 前端开发
celery 源码阅读 - 1
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 是一款消息队列工具,可用于处理实时数据以及任务调度。
207 0
|
消息中间件 存储 JSON
celery 源码解析 - 3
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
163 0
celery 源码解析 - 3
|
消息中间件 缓存 前端开发
神器 celery 源码解析 - 8
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
270 0
神器 celery 源码解析 - 8
|
消息中间件 监控 算法
神器 celery 源码解析 - 6
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
304 0
神器 celery 源码解析 - 6
|
消息中间件 存储 监控
神器 celery 源码解析 - 5
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
668 0
神器 celery 源码解析 - 5
|
消息中间件 网络协议 NoSQL
神器 celery 源码解析 - 2
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
266 0
神器 celery 源码解析 - 2
|
消息中间件 算法 前端开发
神器 celery 源码解析 - 7
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
321 0
神器 celery 源码解析 - 7