celery是什么
Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。
Celery 可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。
Celery 是用Python编写的,但是协议可以通过任何语言进行实现。
转载自官方文档www.celerycn.io/ru-men/cele…
安装 celery
celery支持多种backend,我们选择redis。安装命令如下:
python3 -m venv env source env/bin/activate pip install -U "celery[redis]"
完成后,项目的包如下:
amqp==2.5.1 billiard==3.6.1.0 celery==4.3.0 importlib-metadata==0.23 kombu==4.6.4 more-itertools==7.2.0 pytz==2019.2 redis==3.3.8 vine==1.3.0 zipp==0.6.0
使用"celery[redis]"
会同时安装celery和redis
使用 celery 任务
1. 启动redis容器
docker run -it -p 6379:6379 redis:5.0.4-alpine
2. 编写 celery 项目
项目目录结构如下:
├── project0 │ ├── __init__.py │ ├── app.py │ ├── env │ ├── settings.py │ └── tasks.py
app.py
中声明 celery
# -*- coding:utf-8 -*- from celery import Celery app = Celery('demo') app.config_from_object('settings') if __name__ == '__main__': app.start()
使用 settings.py
配置backend等:
BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 CELERY_TIMEZONE = 'Asia/Shanghai' # 时区配置 CELERY_IMPORTS = ( 'tasks', )
tasks.py
中定义hello任务:
from app import app @app.task def hello(word): print("hello:", time.ctime(), word) return "hello,{}".format(word)
3. 启动 celery worker
(env) ➜ project0 celery -A app worker -l info celery@YoodeMac-mini.local v4.3.0 (rhubarb) Darwin-18.6.0-x86_64-i386-64bit 2019-09-24 16:20:55 [config] .> app: demo:0x108085160 .> transport: redis://localhost:6379/0 .> results: redis://localhost:6379/1 .> concurrency: 8 (prefork) .> task events: OFF (enable -E to monitor tasks in this worker) [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.hello [2019-09-24 16:20:56,134: INFO/MainProcess] Connected to redis://localhost:6379/0 [2019-09-24 16:20:56,159: INFO/MainProcess] mingle: searching for neighbors [2019-09-24 16:20:57,199: INFO/MainProcess] mingle: all alone [2019-09-24 16:20:57,236: INFO/MainProcess] celery@YoodeMac-mini.local ready.
可见tasks.hello已经注册到任务列表[tasks]中。
4. 启动任务
(env) ➜ project0 python Python 3.7.1 (v3.7.1:260ec2c36a, Oct 20 2018, 03:13:28) [Clang 6.0 (clang-600.0.57)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> import tasks >>> >>> t = tasks.hello.delay("world")
同时在worker中可以看到任务执行的情况:
... [2019-09-24 16:24:05,726: WARNING/ForkPoolWorker-8] hello: [2019-09-24 16:24:05,727: WARNING/ForkPoolWorker-8] Tue Sep 24 16:24:05 2019 [2019-09-24 16:24:05,728: WARNING/ForkPoolWorker-8] world ...
在终端中,可以看到task执行的状态和结果:
>>> t <AsyncResult: d8f80147-c2c8-419e-ac9b-e317e7533cf1> >>> t.get() 'hello,world' >>> t.info 'hello,world' >>> t.result 'hello,world' >>> t.task_id 'd8f80147-c2c8-419e-ac9b-e317e7533cf1' >>> t.successful() True >>> t.state 'SUCCESS'
通过上述实验,可见celery是分布式的生产者-消费者模型。worker进程负责消费(执行任务),生产进程负责生产(启动任务),redis负责消息传递和结果保存。