python—Celery异步分布式

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介:

一、Celery异步分布式

Celery  是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息,然后celery的worker从中取消息

Celery  用于存储消息以及celery执行的一些消息和结果


对于brokers,官方推荐是rabbitmq和redis

对于backend,也就是指数据库,为了简单一般使用redis


clipboard.png


使用redis连接url格式:

redis://:password@hostname:port/db_number


1)定义连接脚本tasks.py


1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
from  celery  import  Celery
broker  =  "redis://192.168.2.230:6379/1"
backend  =  "redis://192.168.2.230:6379/2"
app  =  Celery( "tasks" , broker = broker, backend = backend)
 
@app .task
def  add(x,y):
     return  x + y


2)安装启动celery

pip install celery

pip install redis

启动方式:celery -A huang tasks -l info  #-l 等同于 --loglevel

1.png


3)执行测试 huang.py 

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
from  tasks  import  add
 
re  =  add.delay( 10 , 20 )
 
print (re.result)    #任务返回值
print (re.ready)      #如果任务被执行返回True,其他情况返回False
 
print (re.get(timeout = 1 ))   #带参数的等待,最后返回结果
print (re.status)   #任务当前状态

运行结果:

30

<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>

30

SUCCESS


4)根据成功返回的key或celery界面输出的信息,查看redis存储

blob.png


说明:停止celery服务,执行完huang.py之后,再启动celery服务也是有保存数据的



二、celery多进程

1.png

1)配置文件 celeryconfig.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  kombu  import  Exchange,Queue
 
BROKER_URL  =  "redis://192.168.2.230:6379/3"
CELERY_RESULT_BACKEND  =  "redis://192.168.2.230:6379/4"
 
CELERY_QUEUES  =  (
Queue( "default" ,Exchange( "default" ),routing_key = "default" ),
Queue( "for_task_A" ,Exchange( "for_task_A" ),routing_key = "for_task_A" ),
Queue( "for_task_B" ,Exchange( "for_task_B" ),routing_key = "for_task_B" )
)
 
CELERY_ROUTES  =  {
'tasks.taskA' :{ "queue" : "for_task_A" , "routing_key" : "for_task_A" },
'tasks.taskB' :{ "queue" : "for_task_B" , "routing_key" : "for_task_B" }
}


2)tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  celery  import  Celery
 
app  =  Celery()
app.config_from_object( "celeryconfig" )
 
@app .task
     def  taskA(x,y):
     return  x + y
     
@app .task
     def  taskB(x,y,z):
     return  x + y + z


3)启动celery

celery -A tasks worker --loglevel info


4)执行脚本huang2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  tasks  import  taskA,taskB
 
re  =  taskA.delay( 10 , 20 )
 
print (re.result)    #任务返回值
print (re.ready)      #如果任务被执行返回True,其他情况返回False
print (re.get(timeout = 1 ))   #带参数的等待,最后返回结果
print (re.status)   #任务当前状态
 
re2  =  taskB.delay( 10 , 20 , 30 )
print (re2.result)
print (re2.ready)
print (re2.get(timeout = 1 ))
print (re2.status)


5)运行结果

None

<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>

30

SUCCESS

None

<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>

60

SUCCESS











本文转自 huangzp168 51CTO博客,原文链接:http://blog.51cto.com/huangzp/2052713,如需转载请自行联系原作者
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
数据采集 Java Python
python并发编程:Python异步IO实现并发爬虫
python并发编程:Python异步IO实现并发爬虫
24 1
|
29天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
14天前
|
调度 数据库 Python
【专栏】异步IO在处理IO密集型任务中的高效性
【4月更文挑战第27天】本文介绍了Python并发编程和异步IO,包括并发的基本概念(多线程、多进程、协程),线程与进程的实现(threading和multiprocessing模块),协程的使用(asyncio模块),以及异步IO的原理和优势。强调了异步IO在处理IO密集型任务中的高效性,指出应根据任务类型选择合适的并发技术。
|
5天前
|
API UED Python
使用Python进行异步HTTP请求的实践指南
使用Python进行异步HTTP请求的实践指南
19 4
|
12天前
|
并行计算 数据处理 开发者
Python并发编程:解析异步IO与多线程
本文探讨了Python中的并发编程技术,着重比较了异步IO和多线程两种常见的并发模型。通过详细分析它们的特点、优劣势以及适用场景,帮助读者更好地理解并选择适合自己项目需求的并发编程方式。
|
15天前
|
人工智能 算法 调度
uvloop,一个强大的 Python 异步IO编程库!
uvloop,一个强大的 Python 异步IO编程库!
24 2
|
16天前
|
API 调度 开发者
Python中的并发编程:使用asyncio库实现异步IO
传统的Python编程模式中,使用多线程或多进程实现并发操作可能存在性能瓶颈和复杂性问题。而随着Python 3.5引入的asyncio库,开发者可以利用异步IO来更高效地处理并发任务。本文将介绍如何利用asyncio库实现异步IO,提升Python程序的并发性能。
|
22天前
|
数据采集 缓存 算法
使用Python打造爬虫程序之Python中的并发与异步IO:解锁高效数据处理之道
【4月更文挑战第19天】本文探讨了Python中的并发与异步IO,区分了并发(同时处理任务)与并行(同时执行任务)的概念。Python的多线程受限于GIL,适合IO密集型任务,而多进程适用于CPU密集型任务。异步IO通过非阻塞和回调/协程实现高效IO,Python的asyncio库提供了支持。应用场景包括Web开发和网络爬虫等。实践指南包括理解任务类型、使用asyncio、避免阻塞操作、合理设置并发度和优化性能。理解并运用这些技术能提升Python程序的效率和性能。
|
24天前
|
开发者 Python
Python中的并发编程:使用asyncio模块实现异步任务
传统的Python编程中,使用多线程或多进程进行并发操作时,常常会面临性能瓶颈和资源竞争的问题。而随着Python 3.5版本的引入,asyncio模块为开发者提供了一种基于协程的异步编程方式。本文将介绍如何使用asyncio模块实现异步任务,提高Python程序的并发处理能力。
|
24天前
|
监控 NoSQL 测试技术
python使用Flask,Redis和Celery的异步任务
python使用Flask,Redis和Celery的异步任务