celery--实现异步任务

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: celery--实现异步任务

前戏


已经安装好了celery,redis模块,还安装好了redis服务。

新建两个py文件,一个为task,一个为demo,内容如下。

task.py

import time
def test(name):
    time.sleep(5)
    print(f'hello {name}')
    return f'{name}'

demo.py

from task import test
if __name__ == '__main__':
    test('jack')

这两个文件内容相信大家都能看懂(看不懂也没关系)

执行demo.py会输出hello jack


使用celery实现异步


声明:本人的redis服务和代码都在远程服务器上,本地的pycharm也上连接的远程服务器

修改task.py

import time
from celery import Celery
# 实例化Celery
app = Celery('celery_test', broker='redis://:redis666@127.0.0.1:6379', backend='redis://:redis666@127.0.0.1:6379')
# redis666为redis的密码,前面要加: 后面要加@
@app.task
def test(name):
    time.sleep(5)
    print(f'hello {name}')
    return 'wahaha'

其中的broker是存放缓存任务的。backend是存放缓存任务结果的。这里是存放在redis里。

修改demo.py

from task import test
if __name__ == '__main__':
    # 将任务存放在broker里
    test.delay('jack')  # 这样调用

在服务器上启动celery worker,我的是在虚拟环境里安装的celery,所以要进入到虚拟环境在执行

celery -A task worker -l INFO

也可以加上-c参数  celery worker -A task -c 6 -l INFO

其中 -A task 表示只运行task里的任务。-c 启动的worker数量,这里为6。-l 日志级别,这里为INFO级别。

执行demo.py,查看celery的work


celery的配置文件抽取


目录结构如下

apps下的__init__.py内容

from celery import Celery
app = Celery('test_task')
app.config_from_object('apps.celery_conf')

celery_conf.py的内容

BROKER_URL = 'redis://:redis666@127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://:redis666@127.0.0.1:6379/2'
# 需要导入task1和task2
CELERY_IMPORTS = (
    'apps.task1',
    'apps.task2'
)

task1.py的内容

from  apps import app
@app.task
def add(x,y):
    return x+y
task2.py的内容
from  apps import app
@app.task
def subs(x,y):
    return x-y

demo.py的内容

from apps.task1 import add
from apps.task2 import subs
if __name__ == '__main__':
    add.delay(3,5)
    subs.delay(8,6)

启动celery worker

运行demo.py


celery的常用配置


上面我们创建里celery_conf.py文件,用来存放celery的配置。其他的配置我们也可以写在里面,常用的配置如下

BROKER_URL = 'redis://:redis666@127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://:redis666@127.0.0.1:6379/2'
# 需要导入task1和task2
CELERY_IMPORTS = (
    'apps.task1',
    'apps.task2'
)
# 时区设置。默认UTC
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否使用本地的时区,False时将使用本地的时区
CELERY_ENABLE_UTC = False
# 重写task的属性,限制tasks模块下的add函数,每秒钟只能执行10次
CELERY_ANNOTATIONS = {'tasks.add':{'rate_limit':'10/s'}}
# 连接错误情况下是否重试发布任务消息,默认为True
CELERY_TASK_PUBLISH_RETRY = False
# 并发的worker数量,也是命令行-c指定的数目
# 事实上并不是worker数量越多越好,保证任务不堆积,加上一些新增任务的预留就可以了
CELERYD_CONCURRENCY = 20
# 每次worker去任务队列中取的任务数量
CELERY_PREFETH_MULTIPLIRE = 5
# 每个worker执行多少次被杀掉
CELERYD_MAX_TASKS_PER_CHILD = 200
# 单个任务的最大执行时间
CELERY_TASK_TIME_LIMIT = 60
# celery任务执行结果的超时时间
CELERY_TASK_RESULT_EXPIRES = 1000


wins下启动worker报错


如果你在wins下执行了 celery worker -A task -l INFO,运行demo.py后,celery报如下错误

这是因为3.x之后的celery不支持wins导致的。我们只需要在安装一个第三方库eventlet就可以了

pip install eventlet

然后我们启动worker就不能以上面的方式启动了,需要加个 -P 参数

celery worker -A task -l INFO -P eventlet

然后执行demo.py,worker就不会报错了


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
NoSQL 调度 Redis
Celery
【10月更文挑战第10天】
39 4
|
3月前
|
NoSQL 大数据 Redis
使用 Flask 和 Celery 构建异步任务处理
使用 Flask 和 Celery 构建异步任务处理
119 2
|
3月前
|
消息中间件 存储 BI
|
6月前
|
消息中间件 NoSQL Redis
【译】Celery文档1:First Steps with Celery——安装和配置Celery
【译】Celery文档1:First Steps with Celery——安装和配置Celery
102 14
|
消息中间件
celery--调用异步任务的三种方法和task参数
celery--调用异步任务的三种方法和task参数
|
消息中间件 开发框架 NoSQL
celery--介绍
celery--介绍
|
Python
Python编程:Celery执行异步任务和定时任务
Python编程:Celery执行异步任务和定时任务
279 0
Python编程:Celery执行异步任务和定时任务
|
NoSQL API 调度
Celery初探
Celery初探
246 0
|
监控 Python
Python编程:Django中使用Celery执行异步任务和定时任务
Python编程:Django中使用Celery执行异步任务和定时任务
237 0