Python编程:Celery执行异步任务和定时任务

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Python编程:Celery执行异步任务和定时任务

image.png

1、编写函数celery_app.py

from celery import Celery
import time
broker = "redis://localhost:6379/1"
backend = "redis://localhost:6379/2"
app = Celery("my_task", broker=broker, backend=backend)
@app.task(name="task")
def add(a, b):
    print("coming...")
    time.sleep(5)
    return a + b
if __name__ == '__main__':
    result = add(1, 2)
    print(result)

2、启动worker


$ celery worker -A celery_app -l INFO

参数:

A: app文件名称

l:日志级别


3、启动任务

> from celery_app import add
> result = add.delay(3, 4)
> result.ready()
> result.get()

4、工程化使用

目录结构

├── app.py
└── celery_app
    ├── __init__.py
    ├── celeryconfig.py
    ├── task1.py
    └── task2.py

实例化Celery __init__.py

# -*- coding: utf-8 -*-
from celery import Celery
app = Celery(__file__)
# 加载配置模块
app.config_from_object("celery_app.celeryconfig")

配置文件 celeryconfig.py


# -*- encoding:utf-8 -*-
# celery配置文件
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
# 设置时区,默认UTC
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ENABLE_UTC = True
# 导入指定的任务模块
CELERY_IMPORTS = (
    "celery_app.task1",
    "celery_app.task2"
)

任务文件 task1.py

# -*- coding: utf-8 -*-
import time
from celery_app import app
@app.task
def add(x, y):
    time.sleep(3)
    return x + y

任务文件 task2.py

# -*- coding: utf-8 -*-
import time
from celery_app import app
@app.task
def multiply(x, y):
    time.sleep(5)
    return x * y

启动worker

$ celery worker -A celery_app -l INFO

5、定时任务

可以直接配置到配置文件中 celeryconfig.py


from datetime import timedelta
from celery.schedules import crontab
# 设置定时任务
CELERYBEAT_SCHEDULE = {
    "task1": {
        "task": "celery_app.task1.add",
        "schedule": timedelta(seconds=10),
        "args": (2, 8)
    },
    "task2": {
        "task": "celery_app.task1.add",
        "schedule": crontab(hour=14, minute=59),
        "args": (2, 8)
    }
}

启动定时任务


$ celery beat -A celery_app -l INFO

celery 4.1.0 时区bug -> 4.0.2


一条命令启动异步任务和定时任务


$ celery -B -A celery_app worker -l INFO

相关文章
|
3月前
|
数据采集 机器学习/深度学习 人工智能
Python:现代编程的首选语言
Python:现代编程的首选语言
286 102
|
3月前
|
数据采集 机器学习/深度学习 算法框架/工具
Python:现代编程的瑞士军刀
Python:现代编程的瑞士军刀
310 104
|
2月前
|
Python
Python编程:运算符详解
本文全面详解Python各类运算符,涵盖算术、比较、逻辑、赋值、位、身份、成员运算符及优先级规则,结合实例代码与运行结果,助你深入掌握Python运算符的使用方法与应用场景。
179 3
|
2月前
|
数据处理 Python
Python编程:类型转换与输入输出
本教程介绍Python中输入输出与类型转换的基础知识,涵盖input()和print()的使用,int()、float()等类型转换方法,并通过综合示例演示数据处理、错误处理及格式化输出,助你掌握核心编程技能。
411 3
|
2月前
|
并行计算 安全 计算机视觉
Python多进程编程:用multiprocessing突破GIL限制
Python中GIL限制多线程性能,尤其在CPU密集型任务中。`multiprocessing`模块通过创建独立进程,绕过GIL,实现真正的并行计算。它支持进程池、队列、管道、共享内存和同步机制,适用于科学计算、图像处理等场景。相比多线程,多进程更适合利用多核优势,虽有较高内存开销,但能显著提升性能。合理使用进程池与通信机制,可最大化效率。
259 3
|
2月前
|
存储 Java 调度
Python定时任务实战:APScheduler从入门到精通
APScheduler是Python强大的定时任务框架,通过触发器、执行器、任务存储和调度器四大组件,灵活实现各类周期性任务。支持内存、数据库、Redis等持久化存储,适用于Web集成、数据抓取、邮件发送等场景,解决传统sleep循环的诸多缺陷,助力构建稳定可靠的自动化系统。(238字)
480 1
|
2月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
250 0
|
安全 Unix Shell
Python 异步: 在非阻塞子进程中运行命令(19)
Python 异步: 在非阻塞子进程中运行命令(19)
1064 0
|
调度 Python
Python3的原生协程(Async/Await)和Tornado异步非阻塞
我们知道在程序在执行 IO 密集型任务的时候,程序会因为等待 IO 而阻塞,而协程作为一种用户态的轻量级线程,可以帮我们解决这个问题。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存,在调度回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合
Python3的原生协程(Async/Await)和Tornado异步非阻塞
|
网络协议 网络安全 数据安全/隐私保护
Python 异步: 非阻塞流(20)
Python 异步: 非阻塞流(20)

推荐镜像

更多