celery--实现异步任务

简介: celery--实现异步任务

前戏


已经安装好了celery,redis模块,还安装好了redis服务。

新建两个py文件,一个为task,一个为demo,内容如下。

task.py

import time
def test(name):
    time.sleep(5)
    print(f'hello {name}')
    return f'{name}'

demo.py

from task import test
if __name__ == '__main__':
    test('jack')

这两个文件内容相信大家都能看懂(看不懂也没关系)

执行demo.py会输出hello jack


使用celery实现异步


声明:本人的redis服务和代码都在远程服务器上,本地的pycharm也上连接的远程服务器

修改task.py

import time
from celery import Celery
# 实例化Celery
app = Celery('celery_test', broker='redis://:redis666@127.0.0.1:6379', backend='redis://:redis666@127.0.0.1:6379')
# redis666为redis的密码,前面要加: 后面要加@
@app.task
def test(name):
    time.sleep(5)
    print(f'hello {name}')
    return 'wahaha'

其中的broker是存放缓存任务的。backend是存放缓存任务结果的。这里是存放在redis里。

修改demo.py

from task import test
if __name__ == '__main__':
    # 将任务存放在broker里
    test.delay('jack')  # 这样调用

在服务器上启动celery worker,我的是在虚拟环境里安装的celery,所以要进入到虚拟环境在执行

celery -A task worker -l INFO

也可以加上-c参数  celery worker -A task -c 6 -l INFO

其中 -A task 表示只运行task里的任务。-c 启动的worker数量,这里为6。-l 日志级别,这里为INFO级别。

执行demo.py,查看celery的work


celery的配置文件抽取


目录结构如下

apps下的__init__.py内容

from celery import Celery
app = Celery('test_task')
app.config_from_object('apps.celery_conf')

celery_conf.py的内容

BROKER_URL = 'redis://:redis666@127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://:redis666@127.0.0.1:6379/2'
# 需要导入task1和task2
CELERY_IMPORTS = (
    'apps.task1',
    'apps.task2'
)

task1.py的内容

from  apps import app
@app.task
def add(x,y):
    return x+y
task2.py的内容
from  apps import app
@app.task
def subs(x,y):
    return x-y

demo.py的内容

from apps.task1 import add
from apps.task2 import subs
if __name__ == '__main__':
    add.delay(3,5)
    subs.delay(8,6)

启动celery worker

运行demo.py


celery的常用配置


上面我们创建里celery_conf.py文件,用来存放celery的配置。其他的配置我们也可以写在里面,常用的配置如下

BROKER_URL = 'redis://:redis666@127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://:redis666@127.0.0.1:6379/2'
# 需要导入task1和task2
CELERY_IMPORTS = (
    'apps.task1',
    'apps.task2'
)
# 时区设置。默认UTC
CELERY_TIMEZONE = 'Asia/Shanghai'
# 是否使用本地的时区,False时将使用本地的时区
CELERY_ENABLE_UTC = False
# 重写task的属性,限制tasks模块下的add函数,每秒钟只能执行10次
CELERY_ANNOTATIONS = {'tasks.add':{'rate_limit':'10/s'}}
# 连接错误情况下是否重试发布任务消息,默认为True
CELERY_TASK_PUBLISH_RETRY = False
# 并发的worker数量,也是命令行-c指定的数目
# 事实上并不是worker数量越多越好,保证任务不堆积,加上一些新增任务的预留就可以了
CELERYD_CONCURRENCY = 20
# 每次worker去任务队列中取的任务数量
CELERY_PREFETH_MULTIPLIRE = 5
# 每个worker执行多少次被杀掉
CELERYD_MAX_TASKS_PER_CHILD = 200
# 单个任务的最大执行时间
CELERY_TASK_TIME_LIMIT = 60
# celery任务执行结果的超时时间
CELERY_TASK_RESULT_EXPIRES = 1000


wins下启动worker报错


如果你在wins下执行了 celery worker -A task -l INFO,运行demo.py后,celery报如下错误

这是因为3.x之后的celery不支持wins导致的。我们只需要在安装一个第三方库eventlet就可以了

pip install eventlet

然后我们启动worker就不能以上面的方式启动了,需要加个 -P 参数

celery worker -A task -l INFO -P eventlet

然后执行demo.py,worker就不会报错了


相关文章
|
Web App开发 iOS开发 Windows
ios获取原生系统应用的包名
ios获取原生系统应用的包名
3058 0
|
2月前
|
人工智能 搜索推荐 数据可视化
2025年国内知名智能营销产品(企业级智能营销系统)深度解析:功能亮点与市场排名
本文深度解析瓴羊Quick Audience、神策、致趣百川等主流用户智能运营产品,在功能、市场表现与行业应用三大维度对比,助力企业根据业务需求精准选型,提升运营效率与竞争力。2025年国内知名智能营销产品(企业级智能营销系统)深度解析:功能亮点与市场排名。
|
5月前
|
安全 数据可视化 数据管理
国内主流低代码开发平台解析与盘点
本文系统梳理了当前主流低代码开发平台,涵盖通用型、垂直行业型、流程自动化型、数据库驱动型及移动应用优先型平台,分析了其功能特点、技术架构与适用场景,并从企业需求、规模、预算及技术支持等方面提供选型建议。文章指出,低代码平台正加速与AI、边缘计算等技术融合,推动企业数字化转型。
308 1
|
Python
PyCharm在用Django开发时debug模式启动失败显示can't find '__main__' module的解决方法
初次用Django开发web应用,在试图用Pycharm进行debug的时候,出现了一个奇怪的问题。以正常模式启动或者在terminal启动都没有问题。但是以debug模式启动时,显示`can't find '__main__' module”`报错。在网上找了很久都没有看到解决方法,最后在某乎上看到一篇文章,在启动时加上`--noreload`参数,既可以debug模式启动。
540 0
|
Oracle 关系型数据库 数据管理
企业进销存管理系统的设计与实现_kaic
企业进销存管理系统的设计与实现_kaic
|
网络协议 安全 网络安全
IPv4 地址耗尽,为什么 IPv6 没有广泛将其取代?
IPv4 地址耗尽,为什么 IPv6 没有广泛将其取代?
498 0
|
数据采集 分布式计算 大数据
森马基于MaxCompute+Hologres+DataWorks构建数据中台
本次案例主要分享森马集团面对多年自建的多套数仓产品体系,通过阿里云MaxCompute+Hologres+DataWorks统一数仓平台,保障数据生产稳定性与数据质量,减少ETL链路及计算时间,每年数仓整体费用从300多万降到180万。
|
前端开发 Go API
Gin vs Beego: Golang的Web框架之争
Gin vs Beego: Golang的Web框架之争
|
存储 运维 JavaScript
云HIS是什么?HIS系统为什么要上云?云HIS有哪些优点?
云HIS的主要功能作用是提供四个面向的服务,即面向居民的健康服务、面向医疗机构的医疗服务、面向各级管理机关的卫生管理服务、面向其它卫生机构的卫生协同服务。
1117 1
|
消息中间件 存储 负载均衡
RocketMQ Connect 构建流式数据处理平台
RocketMQ Connect 作为 RocketMQ 与其他系统间流式数据传输的重要工具,轻松将 RocketMQ 与其他存储技术进行集成,并实现低延迟流/批处理。
659 1
RocketMQ Connect 构建流式数据处理平台