1、编写函数celery_app.py
from celery import Celery import time broker = "redis://localhost:6379/1" backend = "redis://localhost:6379/2" app = Celery("my_task", broker=broker, backend=backend) @app.task(name="task") def add(a, b): print("coming...") time.sleep(5) return a + b if __name__ == '__main__': result = add(1, 2) print(result)
2、启动worker
$ celery worker -A celery_app -l INFO
参数:
A: app文件名称
l:日志级别
3、启动任务
> from celery_app import add > result = add.delay(3, 4) > result.ready() > result.get()
4、工程化使用
目录结构
├── app.py └── celery_app ├── __init__.py ├── celeryconfig.py ├── task1.py └── task2.py
实例化Celery __init__.py
# -*- coding: utf-8 -*- from celery import Celery app = Celery(__file__) # 加载配置模块 app.config_from_object("celery_app.celeryconfig")
配置文件 celeryconfig.py
# -*- encoding:utf-8 -*- # celery配置文件 BROKER_URL = 'redis://localhost:6379/1' CELERY_RESULT_BACKEND = 'redis://localhost:6379/2' # 设置时区,默认UTC CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ENABLE_UTC = True # 导入指定的任务模块 CELERY_IMPORTS = ( "celery_app.task1", "celery_app.task2" )
任务文件 task1.py
# -*- coding: utf-8 -*- import time from celery_app import app @app.task def add(x, y): time.sleep(3) return x + y
任务文件 task2.py
# -*- coding: utf-8 -*- import time from celery_app import app @app.task def multiply(x, y): time.sleep(5) return x * y
启动worker
$ celery worker -A celery_app -l INFO
5、定时任务
可以直接配置到配置文件中 celeryconfig.py
from datetime import timedelta from celery.schedules import crontab # 设置定时任务 CELERYBEAT_SCHEDULE = { "task1": { "task": "celery_app.task1.add", "schedule": timedelta(seconds=10), "args": (2, 8) }, "task2": { "task": "celery_app.task1.add", "schedule": crontab(hour=14, minute=59), "args": (2, 8) } }
启动定时任务
$ celery beat -A celery_app -l INFO
celery 4.1.0 时区bug -> 4.0.2
一条命令启动异步任务和定时任务
$ celery -B -A celery_app worker -l INFO