celery

简介: celery

1.初始celery

安装:pip install celery

启动命令:celery -A celery_task worker -l info -P gevent

启动成功:

E:\dayData\celery学习目录\基本使用>celery -A celery_task worker -l info -P gevent

 -------------- celery@DESKTOP-L7PAGFQ v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Windows-10-10.0.17763-SP0 2021-09-06 10:02:34
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         test:0x292d91ade48
- ** ---------- .> transport:   redis://wusen0601.xyz:6379/2
- ** ---------- .> results:     redis://wusen0601.xyz:6379/1
- *** --- * --- .> concurrency: 4 (gevent)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery_task.send_email

[2021-09-06 10:02:34,612: INFO/MainProcess] Connected to redis://wusen0601.xyz:6379/2
[2021-09-06 10:02:34,698: INFO/MainProcess] mingle: searching for neighbors
[2021-09-06 10:02:35,993: INFO/MainProcess] mingle: all alone
[2021-09-06 10:02:36,233: INFO/MainProcess] celery@DESKTOP-L7PAGFQ ready.
[2021-09-06 10:02:36,253: INFO/MainProcess] pidbox: Connected to redis://wusen0601.xyz:6379/2.
[2021-09-06 10:02:50,378: INFO/MainProcess] Task celery_task.send_email[6e786d9d-629a-46f3-a647-3aacaa1
773b8] received
[2021-09-06 10:02:50,380: WARNING/MainProcess] 给wusen发邮件。。。。。
[2021-09-06 10:02:50,380: WARNING/MainProcess]

[2021-09-06 10:02:50,408: INFO/MainProcess] Task celery_task.send_email[a63bdc18-4cde-4f84-9558-b6c5f17
2de3f] received
[2021-09-06 10:02:50,410: WARNING/MainProcess] 给wusen发邮件。。。。。
[2021-09-06 10:02:50,411: WARNING/MainProcess]

[2021-09-06 10:02:55,381: WARNING/MainProcess] 给wusen发邮件完成
[2021-09-06 10:02:55,384: WARNING/MainProcess]

[2021-09-06 10:02:55,412: WARNING/MainProcess] 给wusen发邮件完成
[2021-09-06 10:02:55,412: WARNING/MainProcess]

View Code

 

版本:win10 + py3.7 + celery5.1.2

理解:celery高版本对我们很不友好(pip install gevent)

2.celery

组成部分:消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)

使用场景:异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

     定时任务:定时执行某件事情,比如每天数据统计

3.优点

Simple(简单)
Celery 使用和维护都非常简单,并且不需要配置文件。

Highly Available(高可用)
woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。

Fast(快速)
单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)

Flexible(灵活)
Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。

View Code

4.异步任务

 4.1简单结构  

   4.1.1创建异步任务执行文件celery_task:

import celery
import time
backend = "redis://wusen0601.xyz:6379/1"
broker = "redis://wusen0601.xyz:6379/2"
cel = celery.Celery("test",backend=backend,broker=broker)

@cel.task
def send_email(name):
    print(f"给{name}发邮件。。。。。")
    time.sleep(5)
    print(f"给{name}发邮件完成")
    return "OK"

View Code

   4.1.2创建执行任务文件,produce_task.py:

from icecream import ic
from celery_task import send_email
result1 = send_email.delay("wusen")
ic(result1.id)
result2 = send_email.delay("wusen")
ic(result2.id)

View Code

   4.1.3创建py文件:result.py,查看任务执行结果,

from celery.result import AsyncResult
from celery_task import cel

async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

View Code

  4.2多任务模式

image.png

 4.2.1启动方式:你要在项目根目录下面去启动

     celery -A ct.celery_task worker -l info -P gevent

   4.2.2创建celery_task.py文件

from celery import Celery

cel = Celery(
    "celery_demo",
    broker="redis://wusen0601.xyz:6379/1",
    backend="redis://wusen0601.xyz:6379/2",
#     包含两个任务
    include=[
        "celery_tasks.task01",
        "celery_tasks.task02",
    ]

)
# 时区
cel.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
cel.conf.enable_utc = False

View Code

   4.2.3创建任务一 task01.py

import time
from .celery_task import cel

@cel.task
def send_email(res):
    time.sleep(5)
    return f"向{res}发邮件任务完成"

View Code

   4.2.4创建任务二 task02.py

import time
from .celery_task import cel

@cel.task
def send_msg(res):
    time.sleep(5)
    return f"向{res}发信息任务完成"

View Code

   4.2.5创建生产者  produce_task.py

from .ct.task01 import send_email
from .ct.task02 import send_msg

# 立刻告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay('wusen')
print(result.id)
result = send_msg.delay('wusen')
print(result.id)

View Code

   4.2.6创建检测结果check_result.py

from celery.result import AsyncResult
from .ct.celery_task import cel

async_result = AsyncResult(id="562834c6-e4be-46d2-908a-b102adbbf390", app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除,执行完成,结果不会自动删除
    # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
    # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

View Code

  4.3定时任务之:简单模式(修改一下produce_task.py文件)  

from celery_task import send_email
from datetime import datetime

# 方式一
# v1 = datetime(2020, 3, 11, 16, 19, 00)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = send_email.apply_async(args=["egon",], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = send_email.apply_async(args=["egon"], eta=task_time)
print(result.id)
相关文章
|
8天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
4天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2465 14
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
4天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1505 14
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
1月前
|
运维 Cloud Native Devops
一线实战:运维人少,我们从 0 到 1 实践 DevOps 和云原生
上海经证科技有限公司为有效推进软件项目管理和开发工作,选择了阿里云云效作为 DevOps 解决方案。通过云效,实现了从 0 开始,到现在近百个微服务、数百条流水线与应用交付的全面覆盖,有效支撑了敏捷开发流程。
19274 29
|
1月前
|
人工智能 自然语言处理 搜索推荐
阿里云Elasticsearch AI搜索实践
本文介绍了阿里云 Elasticsearch 在AI 搜索方面的技术实践与探索。
18822 20
|
1月前
|
Rust Apache 对象存储
Apache Paimon V0.9最新进展
Apache Paimon V0.9 版本即将发布,此版本带来了多项新特性并解决了关键挑战。Paimon自2022年从Flink社区诞生以来迅速成长,已成为Apache顶级项目,并广泛应用于阿里集团内外的多家企业。
17515 13
Apache Paimon V0.9最新进展
|
6天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
375 11
|
1月前
|
存储 人工智能 前端开发
AI 网关零代码解决 AI 幻觉问题
本文主要介绍了 AI Agent 的背景,概念,探讨了 AI Agent 网关插件的使用方法,效果以及实现原理。
18698 16
|
3天前
|
算法 Java
JAVA并发编程系列(8)CountDownLatch核心原理
面试中的编程题目“模拟拼团”,我们通过使用CountDownLatch来实现多线程条件下的拼团逻辑。此外,深入解析了CountDownLatch的核心原理及其内部实现机制,特别是`await()`方法的具体工作流程。通过详细分析源码与内部结构,帮助读者更好地理解并发编程的关键概念。
|
3天前
|
SQL 监控 druid
Druid连接池学习
Druid学习笔记,使用Druid进行密码加密。参考文档:https://github.com/alibaba/druid
197 82