1.初始celery
安装:pip install celery
启动命令:celery -A celery_task worker -l info -P gevent
启动成功:
E:\dayData\celery学习目录\基本使用>celery -A celery_task worker -l info -P gevent -------------- celery@DESKTOP-L7PAGFQ v5.1.2 (sun-harmonics) --- ***** ----- -- ******* ---- Windows-10-10.0.17763-SP0 2021-09-06 10:02:34 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: test:0x292d91ade48 - ** ---------- .> transport: redis://wusen0601.xyz:6379/2 - ** ---------- .> results: redis://wusen0601.xyz:6379/1 - *** --- * --- .> concurrency: 4 (gevent) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery_task.send_email [2021-09-06 10:02:34,612: INFO/MainProcess] Connected to redis://wusen0601.xyz:6379/2 [2021-09-06 10:02:34,698: INFO/MainProcess] mingle: searching for neighbors [2021-09-06 10:02:35,993: INFO/MainProcess] mingle: all alone [2021-09-06 10:02:36,233: INFO/MainProcess] celery@DESKTOP-L7PAGFQ ready. [2021-09-06 10:02:36,253: INFO/MainProcess] pidbox: Connected to redis://wusen0601.xyz:6379/2. [2021-09-06 10:02:50,378: INFO/MainProcess] Task celery_task.send_email[6e786d9d-629a-46f3-a647-3aacaa1 773b8] received [2021-09-06 10:02:50,380: WARNING/MainProcess] 给wusen发邮件。。。。。 [2021-09-06 10:02:50,380: WARNING/MainProcess] [2021-09-06 10:02:50,408: INFO/MainProcess] Task celery_task.send_email[a63bdc18-4cde-4f84-9558-b6c5f17 2de3f] received [2021-09-06 10:02:50,410: WARNING/MainProcess] 给wusen发邮件。。。。。 [2021-09-06 10:02:50,411: WARNING/MainProcess] [2021-09-06 10:02:55,381: WARNING/MainProcess] 给wusen发邮件完成 [2021-09-06 10:02:55,384: WARNING/MainProcess] [2021-09-06 10:02:55,412: WARNING/MainProcess] 给wusen发邮件完成 [2021-09-06 10:02:55,412: WARNING/MainProcess]
View Code
版本:win10 + py3.7 + celery5.1.2
理解:celery高版本对我们很不友好(pip install gevent)
2.celery
组成部分:消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)
使用场景:异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
3.优点
Simple(简单) Celery 使用和维护都非常简单,并且不需要配置文件。 Highly Available(高可用) woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。 Fast(快速) 单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时) Flexible(灵活) Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。
View Code
4.异步任务
4.1简单结构
4.1.1创建异步任务执行文件celery_task:
import celery import time backend = "redis://wusen0601.xyz:6379/1" broker = "redis://wusen0601.xyz:6379/2" cel = celery.Celery("test",backend=backend,broker=broker) @cel.task def send_email(name): print(f"给{name}发邮件。。。。。") time.sleep(5) print(f"给{name}发邮件完成") return "OK"
View Code
4.1.2创建执行任务文件,produce_task.py:
from icecream import ic from celery_task import send_email result1 = send_email.delay("wusen") ic(result1.id) result2 = send_email.delay("wusen") ic(result2.id)
View Code
4.1.3创建py文件:result.py,查看任务执行结果,
from celery.result import AsyncResult from celery_task import cel async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除 elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
View Code
4.2多任务模式
4.2.1启动方式:你要在项目根目录下面去启动
celery -A ct.celery_task worker -l info -P gevent
4.2.2创建celery_task.py文件
from celery import Celery cel = Celery( "celery_demo", broker="redis://wusen0601.xyz:6379/1", backend="redis://wusen0601.xyz:6379/2", # 包含两个任务 include=[ "celery_tasks.task01", "celery_tasks.task02", ] ) # 时区 cel.conf.timezone = "Asia/Shanghai" # 是否使用UTC cel.conf.enable_utc = False
View Code
4.2.3创建任务一 task01.py
import time from .celery_task import cel @cel.task def send_email(res): time.sleep(5) return f"向{res}发邮件任务完成"
View Code
4.2.4创建任务二 task02.py
import time from .celery_task import cel @cel.task def send_msg(res): time.sleep(5) return f"向{res}发信息任务完成"
View Code
4.2.5创建生产者 produce_task.py
from .ct.task01 import send_email from .ct.task02 import send_msg # 立刻告知celery去执行test_celery任务,并传入一个参数 result = send_email.delay('wusen') print(result.id) result = send_msg.delay('wusen') print(result.id)
View Code
4.2.6创建检测结果check_result.py
from celery.result import AsyncResult from .ct.celery_task import cel async_result = AsyncResult(id="562834c6-e4be-46d2-908a-b102adbbf390", app=cel) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除,执行完成,结果不会自动删除 # async.revoke(terminate=True) # 无论现在是什么时候,都要终止 # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。 elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
View Code
4.3定时任务之:简单模式(修改一下produce_task.py文件)
from celery_task import send_email from datetime import datetime # 方式一 # v1 = datetime(2020, 3, 11, 16, 19, 00) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # result = send_email.apply_async(args=["egon",], eta=v2) # print(result.id) # 方式二 ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay # 使用apply_async并设定时间 result = send_email.apply_async(args=["egon"], eta=task_time) print(result.id)