使用步骤
1、安装
pip install django django-celery
2、新建工程
$ django-admin.py startproject celery_project $ python manage.py startapp course $ cd celery_project
项目结构
├── celery_project │ ├── __init__.py │ ├── celery_config.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── course │ ├── __init__.py │ ├── admin.py │ ├── apps.py │ ├── migrations │ │ ├── __init__.py │ ├── models.py │ ├── tasks.py │ ├── tests.py │ └── views.py ├── db.sqlite3 └── manage.py
3、新建任务
course/tasks.py
# -*- coding: utf-8 -*- import time from celery.task import Task class CourseTask(Task): name = "course-task" def run(self, *args, **kwargs): print("start course task") time.sleep(4) # 模拟耗时 print(args, kwargs) print("end course task")
4、配置celery
celery_project/celery_config.py
# -*- coding: utf-8 -*- import djcelery from datetime import timedelta djcelery.setup_loader() BROKER_BACKEND = "redis" BROKER_URL = 'redis://localhost:6379/1' CELERY_RESULT_BACKEND = 'redis://localhost:6379/2' # 设置任务队列 CELERY_QUEUES = { "beat_queue": { "exchange": "beat_queue", "exchange_type": "direct", "binding_key": "beat_queue" }, "work_queue": { "exchange": "work_queue", "exchange_type": "direct", "binding_key": "work_queue" } } # 设置默认队列 CELERY_DEFAULT_QUEUE = "work_queue" # 注册任务函数 CELERY_IMPORTS = ( "course.tasks", ) # 设置时区,默认UTC CELERY_TIMEZONE = 'Asia/Shanghai' # 有些情况下防止死锁 CELERYD_FORCE_EXECV = True # 设置并发的worker数量 CELERYD_CONCURRENCY = 4 # 允许重试 CELERYD_ACKS_LATE = True # 每个worker最多执行100个任务,防止内存泄漏 CELERYD_MAX_TASKS_PER_CHILD = 100 # 单个任务最大运行时间 CELERYD_TASK_TIME_LIMIT = 6 * 60
5、配置settings
celery_project/settings.py
# 添加app INSTALLED_APPS = [ ... 'djcelery', 'course' ] # 引入 Celery设置 from .celery_config import *
6、编写视图函数
course/views.py
# -*- coding: utf-8 -*- from course.tasks import CourseTask from django.http import JsonResponse def do(reqeust): # 执行异步任务 print("start do") CourseTask.apply_async(args=("hello", ), queue="work_queque") print("end do") return JsonResponse({"result": "ok"})
7、配置路由
celery_project/urls.py
from course import views urlpatterns = [ url(r'^do/$', views.do) ]
8、启动Django服务和 worker
$ python manage.py help $ python manage.py runserver $ python manage.py celery worker -l INFO # 或者 $ python manage.py celery worker -Q queue
访问测试:http://127.0.0.1:8000/do/
9、设置定时任务
celery_project/celery_config.py
# 设置定时任务 CELERYBEAT_SCHEDULE = { "course-task": { "task": "course-task", "schedule": timedelta(seconds=5), "options": { "queue": "beat_queue" } } }
10、启动beat
$ python manage.py celery beat -l INFO
监控工具flower
安装
pip install flower
启动
celery flower --address=0.0.0.0 --port=5555 --broker=redis://localhost:6379/1 --basic_auth=user:123
django中启动
python manage.py celery flower
访问管理界面 http://localhost:5555
总结
Django中使用Celery 只是多了3个步骤:
1、task任务编写
2、celery配置
3、启动celery-worker