celery 使用指南 - 1

本文涉及的产品
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。

celery是什么



Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个职程(Worker),最后由职程(Worker)进行执行中间人(Broker)分配的任务。


Celery 可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。

Celery 是用Python编写的,但是协议可以通过任何语言进行实现。


转载自官方文档www.celerycn.io/ru-men/cele…


安装 celery



celery支持多种backend,我们选择redis。安装命令如下:


python3 -m venv env
source env/bin/activate
pip install -U "celery[redis]"


完成后,项目的包如下:


amqp==2.5.1
billiard==3.6.1.0
celery==4.3.0
importlib-metadata==0.23
kombu==4.6.4
more-itertools==7.2.0
pytz==2019.2
redis==3.3.8
vine==1.3.0
zipp==0.6.0


使用"celery[redis]"会同时安装celery和redis


使用 celery 任务



1. 启动redis容器


docker run -it -p 6379:6379 redis:5.0.4-alpine


2. 编写 celery 项目


项目目录结构如下:


├── project0
│   ├── __init__.py
│   ├── app.py
│   ├── env
│   ├── settings.py
│   └── tasks.py


app.py 中声明 celery


# -*- coding:utf-8 -*-
from celery import Celery
app = Celery('demo')
app.config_from_object('settings')
if __name__ == '__main__':
    app.start()


使用 settings.py 配置backend等:


BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_TIMEZONE = 'Asia/Shanghai'   # 时区配置
CELERY_IMPORTS = (
     'tasks',
)


tasks.py 中定义hello任务:


from app import app
@app.task
def hello(word):
    print("hello:", time.ctime(), word)
    return "hello,{}".format(word)


3. 启动 celery worker


(env) ➜  project0 celery -A app worker -l info
celery@YoodeMac-mini.local v4.3.0 (rhubarb)
Darwin-18.6.0-x86_64-i386-64bit 2019-09-24 16:20:55
[config]
.> app:         demo:0x108085160
.> transport:   redis://localhost:6379/0
.> results:     redis://localhost:6379/1
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery           exchange=celery(direct) key=celery
[tasks]
  . tasks.hello
[2019-09-24 16:20:56,134: INFO/MainProcess] Connected to redis://localhost:6379/0
[2019-09-24 16:20:56,159: INFO/MainProcess] mingle: searching for neighbors
[2019-09-24 16:20:57,199: INFO/MainProcess] mingle: all alone
[2019-09-24 16:20:57,236: INFO/MainProcess] celery@YoodeMac-mini.local ready.


可见tasks.hello已经注册到任务列表[tasks]中。


4. 启动任务


(env) ➜  project0 python
Python 3.7.1 (v3.7.1:260ec2c36a, Oct 20 2018, 03:13:28)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>>
>>> t = tasks.hello.delay("world")


同时在worker中可以看到任务执行的情况:


...
[2019-09-24 16:24:05,726: WARNING/ForkPoolWorker-8] hello:
[2019-09-24 16:24:05,727: WARNING/ForkPoolWorker-8] Tue Sep 24 16:24:05 2019
[2019-09-24 16:24:05,728: WARNING/ForkPoolWorker-8] world
...


在终端中,可以看到task执行的状态和结果:


>>> t
<AsyncResult: d8f80147-c2c8-419e-ac9b-e317e7533cf1>
>>> t.get()
'hello,world'
>>> t.info
'hello,world'
>>> t.result
'hello,world'
>>> t.task_id
'd8f80147-c2c8-419e-ac9b-e317e7533cf1'
>>> t.successful()
True
>>> t.state
'SUCCESS'


通过上述实验,可见celery是分布式的生产者-消费者模型。worker进程负责消费(执行任务),生产进程负责生产(启动任务),redis负责消息传递和结果保存。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
消息中间件 NoSQL Redis
【译】Celery文档1:First Steps with Celery——安装和配置Celery
【译】Celery文档1:First Steps with Celery——安装和配置Celery
69 14
|
3月前
|
消息中间件 存储 前端开发
【译】Celery文档2:Next Steps——在项目中使用Celery
【译】Celery文档2:Next Steps——在项目中使用Celery
|
3月前
|
数据库 Python
【译】Celery文档3:在Django中使用Celery
【译】Celery文档3:在Django中使用Celery
|
4月前
|
存储 监控 Linux
Airflow【部署 01】Airflow官网Quick Start实操(一篇学会部署Airflow)
【2月更文挑战第7天】Airflow【部署 01】Airflow官网Quick Start实操(一篇学会部署Airflow)
397 1
|
消息中间件 存储 监控
【实测】django的超轻量级消息队列:django-task-mq 使用教程
【实测】django的超轻量级消息队列:django-task-mq 使用教程
|
消息中间件 缓存 前端开发
神器 celery 源码解析 - 8
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
298 0
神器 celery 源码解析 - 8
|
消息中间件 缓存 算法
神器celery 源码解析 - 4
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
248 0
神器celery 源码解析 - 4
|
消息中间件 网络协议 NoSQL
神器 celery 源码解析 - 2
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
291 0
神器 celery 源码解析 - 2
|
消息中间件 算法 前端开发
神器 celery 源码解析 - 7
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
351 0
神器 celery 源码解析 - 7
|
消息中间件 存储 监控
神器 celery 源码解析 - 5
Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 也是一款消息队列工具,可用于处理实时数据以及任务调度。
722 0
神器 celery 源码解析 - 5