python—Celery异步分布式

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介:

一、Celery异步分布式

Celery  是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息,然后celery的worker从中取消息

Celery  用于存储消息以及celery执行的一些消息和结果


对于brokers,官方推荐是rabbitmq和redis

对于backend,也就是指数据库,为了简单一般使用redis


clipboard.png


使用redis连接url格式:

redis://:password@hostname:port/db_number


1)定义连接脚本tasks.py


1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
from  celery  import  Celery
broker  =  "redis://192.168.2.230:6379/1"
backend  =  "redis://192.168.2.230:6379/2"
app  =  Celery( "tasks" , broker = broker, backend = backend)
 
@app .task
def  add(x,y):
     return  x + y


2)安装启动celery

pip install celery

pip install redis

启动方式:celery -A huang tasks -l info  #-l 等同于 --loglevel

1.png


3)执行测试 huang.py 

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
from  tasks  import  add
 
re  =  add.delay( 10 , 20 )
 
print (re.result)    #任务返回值
print (re.ready)      #如果任务被执行返回True,其他情况返回False
 
print (re.get(timeout = 1 ))   #带参数的等待,最后返回结果
print (re.status)   #任务当前状态

运行结果:

30

<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>

30

SUCCESS


4)根据成功返回的key或celery界面输出的信息,查看redis存储

blob.png


说明:停止celery服务,执行完huang.py之后,再启动celery服务也是有保存数据的



二、celery多进程

1.png

1)配置文件 celeryconfig.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  kombu  import  Exchange,Queue
 
BROKER_URL  =  "redis://192.168.2.230:6379/3"
CELERY_RESULT_BACKEND  =  "redis://192.168.2.230:6379/4"
 
CELERY_QUEUES  =  (
Queue( "default" ,Exchange( "default" ),routing_key = "default" ),
Queue( "for_task_A" ,Exchange( "for_task_A" ),routing_key = "for_task_A" ),
Queue( "for_task_B" ,Exchange( "for_task_B" ),routing_key = "for_task_B" )
)
 
CELERY_ROUTES  =  {
'tasks.taskA' :{ "queue" : "for_task_A" , "routing_key" : "for_task_A" },
'tasks.taskB' :{ "queue" : "for_task_B" , "routing_key" : "for_task_B" }
}


2)tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  celery  import  Celery
 
app  =  Celery()
app.config_from_object( "celeryconfig" )
 
@app .task
     def  taskA(x,y):
     return  x + y
     
@app .task
     def  taskB(x,y,z):
     return  x + y + z


3)启动celery

celery -A tasks worker --loglevel info


4)执行脚本huang2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  tasks  import  taskA,taskB
 
re  =  taskA.delay( 10 , 20 )
 
print (re.result)    #任务返回值
print (re.ready)      #如果任务被执行返回True,其他情况返回False
print (re.get(timeout = 1 ))   #带参数的等待,最后返回结果
print (re.status)   #任务当前状态
 
re2  =  taskB.delay( 10 , 20 , 30 )
print (re2.result)
print (re2.ready)
print (re2.get(timeout = 1 ))
print (re2.status)


5)运行结果

None

<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>

30

SUCCESS

None

<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>

60

SUCCESS











本文转自 huangzp168 51CTO博客,原文链接:http://blog.51cto.com/huangzp/2052713,如需转载请自行联系原作者
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
分布式计算 DataWorks 大数据
分布式Python计算服务MaxFrame测评
一文带你入门分布式Python计算服务MaxFrame
84 23
分布式Python计算服务MaxFrame测评
|
20天前
|
人工智能 开发者 Python
Chainlit:一个开源的异步Python框架,快速构建生产级对话式 AI 应用
Chainlit 是一个开源的异步 Python 框架,帮助开发者在几分钟内构建可扩展的对话式 AI 或代理应用,支持多种工具和服务集成。
122 9
|
1月前
|
分布式计算 DataWorks 数据处理
产品测评 | 上手分布式Python计算服务MaxFrame产品最佳实践
MaxFrame是阿里云自研的分布式计算框架,专为大数据处理设计,提供高效便捷的Python开发体验。其主要功能包括Python编程接口、直接利用MaxCompute资源、与MaxCompute Notebook集成及镜像管理功能。本文基于MaxFrame最佳实践,详细介绍了在DataWorks中使用MaxFrame创建数据源、PyODPS节点和MaxFrame会话的过程,并展示了如何通过MaxFrame实现分布式Pandas处理和大语言模型数据处理。测评反馈指出,虽然MaxFrame具备强大的数据处理能力,但在文档细节和新手友好性方面仍有改进空间。
|
1月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
74 7
|
1月前
|
SQL 分布式计算 数据处理
云产品评测|分布式Python计算服务MaxFrame | 在本地环境中使用MaxFrame + 基于MaxFrame实现大语言模型数据处理
本文基于官方文档,介绍了由浅入深的两个部分实操测试,包括在本地环境中使用MaxFrame & 基于MaxFrame实现大语言模型数据处理,对步骤有详细说明。体验下来对MaxCompute的感受是很不错的,值得尝试并使用!
48 1
|
1月前
|
SQL 分布式计算 DataWorks
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
|
1月前
|
Python
深入理解 Python 中的异步操作:async 和 await
Python 的异步编程通过 `async` 和 `await` 关键字处理 I/O 密集型任务,如网络请求和文件读写,显著提高性能。`async` 定义异步函数,返回 awaitable 对象;`await` 用于等待这些对象完成。本文介绍异步编程基础、`async` 和 `await` 的用法、常见模式(并发任务、异常处理、异步上下文管理器)及实战案例(如使用 aiohttp 进行异步网络请求),帮助你高效利用系统资源并提升程序性能。
66 7
|
1月前
|
分布式计算 数据处理 MaxCompute
云产品评测|分布式Python计算服务MaxFrame
云产品评测|分布式Python计算服务MaxFrame
72 2
|
1月前
|
SQL 网络协议 安全
Python异步: 什么时候使用异步?
Asyncio 是 Python 中用于异步编程的库,适用于协程、非阻塞 I/O 和异步任务。使用 Asyncio 的原因包括:1) 使用协程实现轻量级并发;2) 采用异步编程范式提高效率;3) 实现非阻塞 I/O 提升 I/O 密集型应用性能。然而,Asyncio 并不适合所有场景,特别是在 CPU 密集型任务或已有线程/进程方案的情况下。选择 Asyncio 应基于项目需求和技术优势。
|
4月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?

热门文章

最新文章