celery

简介: celery

1.初始celery

安装:pip install celery

启动命令:celery -A celery_task worker -l info -P gevent

启动成功:

E:\dayData\celery学习目录\基本使用>celery -A celery_task worker -l info -P gevent

 -------------- celery@DESKTOP-L7PAGFQ v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Windows-10-10.0.17763-SP0 2021-09-06 10:02:34
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         test:0x292d91ade48
- ** ---------- .> transport:   redis://wusen0601.xyz:6379/2
- ** ---------- .> results:     redis://wusen0601.xyz:6379/1
- *** --- * --- .> concurrency: 4 (gevent)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery_task.send_email

[2021-09-06 10:02:34,612: INFO/MainProcess] Connected to redis://wusen0601.xyz:6379/2
[2021-09-06 10:02:34,698: INFO/MainProcess] mingle: searching for neighbors
[2021-09-06 10:02:35,993: INFO/MainProcess] mingle: all alone
[2021-09-06 10:02:36,233: INFO/MainProcess] celery@DESKTOP-L7PAGFQ ready.
[2021-09-06 10:02:36,253: INFO/MainProcess] pidbox: Connected to redis://wusen0601.xyz:6379/2.
[2021-09-06 10:02:50,378: INFO/MainProcess] Task celery_task.send_email[6e786d9d-629a-46f3-a647-3aacaa1
773b8] received
[2021-09-06 10:02:50,380: WARNING/MainProcess] 给wusen发邮件。。。。。
[2021-09-06 10:02:50,380: WARNING/MainProcess]

[2021-09-06 10:02:50,408: INFO/MainProcess] Task celery_task.send_email[a63bdc18-4cde-4f84-9558-b6c5f17
2de3f] received
[2021-09-06 10:02:50,410: WARNING/MainProcess] 给wusen发邮件。。。。。
[2021-09-06 10:02:50,411: WARNING/MainProcess]

[2021-09-06 10:02:55,381: WARNING/MainProcess] 给wusen发邮件完成
[2021-09-06 10:02:55,384: WARNING/MainProcess]

[2021-09-06 10:02:55,412: WARNING/MainProcess] 给wusen发邮件完成
[2021-09-06 10:02:55,412: WARNING/MainProcess]

View Code

 

版本:win10 + py3.7 + celery5.1.2

理解:celery高版本对我们很不友好(pip install gevent)

2.celery

组成部分:消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)

使用场景:异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

     定时任务:定时执行某件事情,比如每天数据统计

3.优点

Simple(简单)
Celery 使用和维护都非常简单,并且不需要配置文件。

Highly Available(高可用)
woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。

Fast(快速)
单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)

Flexible(灵活)
Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。

View Code

4.异步任务

 4.1简单结构  

   4.1.1创建异步任务执行文件celery_task:

import celery
import time
backend = "redis://wusen0601.xyz:6379/1"
broker = "redis://wusen0601.xyz:6379/2"
cel = celery.Celery("test",backend=backend,broker=broker)

@cel.task
def send_email(name):
    print(f"给{name}发邮件。。。。。")
    time.sleep(5)
    print(f"给{name}发邮件完成")
    return "OK"

View Code

   4.1.2创建执行任务文件,produce_task.py:

from icecream import ic
from celery_task import send_email
result1 = send_email.delay("wusen")
ic(result1.id)
result2 = send_email.delay("wusen")
ic(result2.id)

View Code

   4.1.3创建py文件:result.py,查看任务执行结果,

from celery.result import AsyncResult
from celery_task import cel

async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

View Code

  4.2多任务模式

image.png

 4.2.1启动方式:你要在项目根目录下面去启动

     celery -A ct.celery_task worker -l info -P gevent

   4.2.2创建celery_task.py文件

from celery import Celery

cel = Celery(
    "celery_demo",
    broker="redis://wusen0601.xyz:6379/1",
    backend="redis://wusen0601.xyz:6379/2",
#     包含两个任务
    include=[
        "celery_tasks.task01",
        "celery_tasks.task02",
    ]

)
# 时区
cel.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
cel.conf.enable_utc = False

View Code

   4.2.3创建任务一 task01.py

import time
from .celery_task import cel

@cel.task
def send_email(res):
    time.sleep(5)
    return f"向{res}发邮件任务完成"

View Code

   4.2.4创建任务二 task02.py

import time
from .celery_task import cel

@cel.task
def send_msg(res):
    time.sleep(5)
    return f"向{res}发信息任务完成"

View Code

   4.2.5创建生产者  produce_task.py

from .ct.task01 import send_email
from .ct.task02 import send_msg

# 立刻告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay('wusen')
print(result.id)
result = send_msg.delay('wusen')
print(result.id)

View Code

   4.2.6创建检测结果check_result.py

from celery.result import AsyncResult
from .ct.celery_task import cel

async_result = AsyncResult(id="562834c6-e4be-46d2-908a-b102adbbf390", app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除,执行完成,结果不会自动删除
    # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
    # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

View Code

  4.3定时任务之:简单模式(修改一下produce_task.py文件)  

from celery_task import send_email
from datetime import datetime

# 方式一
# v1 = datetime(2020, 3, 11, 16, 19, 00)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = send_email.apply_async(args=["egon",], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = send_email.apply_async(args=["egon"], eta=task_time)
print(result.id)
相关文章
|
1天前
|
Python
django之使用celery
django之使用celery
|
1月前
|
消息中间件 监控 调度
Celery与RabbitMQ的结合【Python】
【8月更文挑战第18天】 Celery与RabbitMQ结合是构建高效Python分布式系统的利器。Celery作为分布式任务队列,支持任务调度与结果管理;RabbitMQ则确保了消息的可靠传递。二者联用不仅提升了系统的异步处理能力,还增强了其扩展性与可靠性。通过简单的安装与配置,即可实现任务的异步执行与调度,同时利用监控工具优化性能并确保安全性。这种组合适用于需要处理大量异步任务的应用场景,极大地简化了分布式系统的设计与实现。
40 0
|
3月前
|
消息中间件 NoSQL Redis
【译】Celery文档1:First Steps with Celery——安装和配置Celery
【译】Celery文档1:First Steps with Celery——安装和配置Celery
71 14
|
3月前
|
消息中间件 存储 前端开发
【译】Celery文档2:Next Steps——在项目中使用Celery
【译】Celery文档2:Next Steps——在项目中使用Celery
|
3月前
|
数据库 Python
【译】Celery文档3:在Django中使用Celery
【译】Celery文档3:在Django中使用Celery
|
消息中间件 开发框架 NoSQL
celery--介绍
celery--介绍
|
缓存 NoSQL Redis
celery--实现异步任务
celery--实现异步任务
|
消息中间件 监控 NoSQL
python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
在之前的一篇文章中提到了用[Django+Celery+Redis实现了异步任务队列](https://v3u.cn/a_id_54),只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。
python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
|
NoSQL API 调度
Celery初探
Celery初探
229 0