Celery的实践指南

简介:
Celery的实践指南
celery原理:
celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信。
典型的场景为:
  1. 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker。
  2. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行。
 
实践中的典型场景:
  1. 简单的定时任务:
    1. 替换crontab的celery写法:

      1. from celery import Celery
        from celery.schedules import crontab

        app = Celery("tasks", backend="redis://localhost", broker="redis://localhost")

        app.conf.update(CELERYBEAT_SCHEDULE = {
            "add": {
                "task": "celery_demo.add",
                "schedule": crontab(minute="*"),
                "args": (16, 16)
            },
        })

        @app.task
        def add(x, y):
            return x + y

    2. 运行celery的worker,让他作为consumer运行,自动从broker上获得任务并执行。
      1. `celery -A celery_demo worker`
    3. 运行celery的client,让其根据schedule,自动生产出task msg,并发布到broker上。
      1. `celery -A celery_demo beat`
    4. 安装并运行flower,方便监控task的运行状态
      1. `celery flower -A celery_demo`
      2. 或者设置登录密码 `
        celery flower -A celery_demo --basic_auth=user1:password1,user2:password2
  2. 多同步任务-链式任务-
  3. 失败自动重试的task
    1. 失败重试方法: 将task代码函数参数增加self,同时绑定bind。
    2. demo代码:
      1. @app.task(bind=True, default_retry_delay=300, max_retries=5)
        def my_task_A(self):
            try:
                print("doing stuff here...")
            except SomeNetworkException as e:
                print("maybe do some clenup here....")
                self.retry(e)
    3. 自动重试后,是否将任务重新入queue后排队,还是等待指定的时间?可以通过self.retry()参数来指定。
  4. 派发到不同Queue队列的task
    1. 一个task自动映射到多个queue中的方法, 通过配置task和queue的routing_key命名模式。
      1. 比如:把queue的exchange和routing_key配置成通用模式:
      2. 再定义task的routing_key的名称:
    2. 可用的不同exchange策略:
      1. direct:直接根据定义routing_key
      2. topic:exchange会根据通配符来将一个消息推送到多个queue。
      3. fanout:将消息拆分,分别推送到不同queue,通常用于超大任务,耗时任务。
    3. 参考:http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
  5. 高级配置
    1. result是否保存
    2. 失败邮件通知:
    3. 关闭rate limit:
  6. auto_reload方法(*nix系统):
    1. celery通过监控源代码目录的改动,自动地进行reload
    2. 使用方法:1.依赖inotify(Linux) 2. kqueue(OS X / BSD)
    3. 安装依赖:
      $ pip install pyinotify
    4. (可选) 指定fsNotify的依赖:
      $ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
    5. 启动: celery -A appname worker --autoreload
  7. auto-scale方法:
    1. 启用auto-scale
    2. 临时增加worker进程数量(增加consumer):
      $ celery -A proj control add_consumer foo -d worker1.local
    3. 临时减少worker进程数量(减少consumer):
  8. 将scheduled task的配置从app.conf变成DB的方法:
    1. 需要在启动时指定custom schedule 类名,比如默认的是: celery.beat.PersistentScheduler 。
      1.  celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
  9. 启动停止worker的方法:
    1. 启动 as daemon : http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing
      1. root用户可以使用celeryd
      2. 非特权用户:celery multi start worker1 -A appName  —autoreload  --pidfile="HOME/run/celery/HOME/run/celery/HOME/log/celery/%n.log"
      3. 或者 celery worker —detach
    2. 停止
    3. ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
  10. 与Flask集成的方法
    1. 集成后flask将充当producer来创建并发送task给broker,在celery启动的独立worker进程将从broker中获得task并执行,同时将结果返回。
    2. flask中异步地获得task结果的方法:add.delay(x,y),有时需要对参数进行命名后传递 或者 add.apply_async(args=(x,y), countdown=30)
    3. flask获得
  11. 与flask集成后的启动问题
    1. 由于celery的默认routing_key是根据生产者在代码中的import级别来设定的,所以worker端在启动时应该注意其启动目录应该在项目顶级目录上,否者会出现KeyError。
  12. 性能提升: eventlet 和 greenlet
 
 
 


本文转自fandyst 博客园博客,原文链接:http://www.cnblogs.com/ToDoToTry/    ,如需转载请自行联系原作者

相关文章
|
消息中间件 存储 NoSQL
一文读懂python分布式任务队列-celery
# 一文读懂Python分布式任务队列-Celery Celery是一个分布式任务执行框架,支持大量并发任务。它采用生产者-消费者模型,由Broker、Worker和Backend组成。生产者提交任务到队列,Worker异步执行,结果存储在Backend。适用于异步任务、大规模实时任务和定时任务。5月更文挑战第17天
1194 1
|
6月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
9月前
|
数据挖掘 数据处理 数据库
Pandas数据聚合:groupby与agg
Pandas库中的`groupby`和`agg`方法是数据分析中不可或缺的工具,用于数据分组与聚合计算。本文从基础概念、常见问题及解决方案等方面详细介绍这两个方法的使用技巧,涵盖单列聚合、多列聚合及自定义聚合函数等内容,并通过代码案例进行说明,帮助读者高效处理数据。
623 32
|
SQL 数据库 索引
Pandas之DataFrame,快速入门,迅速掌握(三)
Pandas之DataFrame,快速入门,迅速掌握(三)
214 0
|
11月前
|
JavaScript 前端开发 开发工具
开发者如何使用网盘与相册服务PDS
【10月更文挑战第18天】开发者如何使用网盘与相册服务PDS
393 2
|
11月前
|
NoSQL 调度 Redis
Celery
【10月更文挑战第10天】
207 4
|
关系型数据库 MySQL 数据库
django.db.utils.operationalerror:\xF0\x9F\x9....报错解决办法
在将Hexo的.md文件迁移至Django博客时遇到 OperationalError: (1366, "Incorrect string value"),原因是.md文件包含MySQL不支持的4字节UTF8字符。解决方案:1) 将数据库和字段的字符集改为utf8mb4;2) 在Django的MySQL连接串中设置字符集为utf8mb4。问题解决后,顺利插入数据。更多详情见博客:[http://xiejava.ishareread.com/](http://xiejava.ishareread.com/)
255 2
|
NoSQL Java Redis
Springboot整合redis(一般人都能看懂的Lettuce版本)
去年学习的Redis,刚刚学习完就迫不及待的在实战中用了一下,走了很多坑不过幸好都填上了,需求的不断变化发现用不上Redis,一开始去掉了,后来想想加进来比较合适。这篇文章主要讲解Springboot如何整合开发Redis实现一个基本的案例。使用的是目前Springboot2.x的Lettuce版本。希望对你有帮助。
1186 1
|
API Python
Python Web框架:Django、Flask和FastAPI巅峰对决
Python Web框架:Django、Flask和FastAPI巅峰对决
13975 1
|
缓存 移动开发 Java
统一日志的处理Slf4j,log4j,logback
统一日志的处理Slf4j,log4j,logback
355 0