python—Celery异步分布式

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介:

一、Celery异步分布式

Celery  是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息,然后celery的worker从中取消息

Celery  用于存储消息以及celery执行的一些消息和结果


对于brokers,官方推荐是rabbitmq和redis

对于backend,也就是指数据库,为了简单一般使用redis


clipboard.png


使用redis连接url格式:

redis://:password@hostname:port/db_number


1)定义连接脚本tasks.py


1
2
3
4
5
6
7
8
9
#!/usr/bin/env python
from  celery  import  Celery
broker  =  "redis://192.168.2.230:6379/1"
backend  =  "redis://192.168.2.230:6379/2"
app  =  Celery( "tasks" , broker = broker, backend = backend)
 
@app .task
def  add(x,y):
     return  x + y


2)安装启动celery

pip install celery

pip install redis

启动方式:celery -A huang tasks -l info  #-l 等同于 --loglevel

1.png


3)执行测试 huang.py 

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
from  tasks  import  add
 
re  =  add.delay( 10 , 20 )
 
print (re.result)    #任务返回值
print (re.ready)      #如果任务被执行返回True,其他情况返回False
 
print (re.get(timeout = 1 ))   #带参数的等待,最后返回结果
print (re.status)   #任务当前状态

运行结果:

30

<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>

30

SUCCESS


4)根据成功返回的key或celery界面输出的信息,查看redis存储

blob.png


说明:停止celery服务,执行完huang.py之后,再启动celery服务也是有保存数据的



二、celery多进程

1.png

1)配置文件 celeryconfig.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  kombu  import  Exchange,Queue
 
BROKER_URL  =  "redis://192.168.2.230:6379/3"
CELERY_RESULT_BACKEND  =  "redis://192.168.2.230:6379/4"
 
CELERY_QUEUES  =  (
Queue( "default" ,Exchange( "default" ),routing_key = "default" ),
Queue( "for_task_A" ,Exchange( "for_task_A" ),routing_key = "for_task_A" ),
Queue( "for_task_B" ,Exchange( "for_task_B" ),routing_key = "for_task_B" )
)
 
CELERY_ROUTES  =  {
'tasks.taskA' :{ "queue" : "for_task_A" , "routing_key" : "for_task_A" },
'tasks.taskB' :{ "queue" : "for_task_B" , "routing_key" : "for_task_B" }
}


2)tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  celery  import  Celery
 
app  =  Celery()
app.config_from_object( "celeryconfig" )
 
@app .task
     def  taskA(x,y):
     return  x + y
     
@app .task
     def  taskB(x,y,z):
     return  x + y + z


3)启动celery

celery -A tasks worker --loglevel info


4)执行脚本huang2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
#-*- coding:utf-8 -*-
 
from  tasks  import  taskA,taskB
 
re  =  taskA.delay( 10 , 20 )
 
print (re.result)    #任务返回值
print (re.ready)      #如果任务被执行返回True,其他情况返回False
print (re.get(timeout = 1 ))   #带参数的等待,最后返回结果
print (re.status)   #任务当前状态
 
re2  =  taskB.delay( 10 , 20 , 30 )
print (re2.result)
print (re2.ready)
print (re2.get(timeout = 1 ))
print (re2.status)


5)运行结果

None

<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>

30

SUCCESS

None

<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>

60

SUCCESS











本文转自 huangzp168 51CTO博客,原文链接:http://blog.51cto.com/huangzp/2052713,如需转载请自行联系原作者
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
16天前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型的分布式训练
使用Python实现深度学习模型的分布式训练
154 73
|
11天前
|
人工智能 分布式计算 数据处理
云产品评测:MaxFrame — 分布式Python计算服务的最佳实践与体验
阿里云推出的MaxFrame是一款高性能分布式计算平台,专为大规模数据处理和AI应用设计。它提供了强大的Python编程接口,支持分布式Pandas操作,显著提升数据处理速度(3-5倍)。MaxFrame在大语言模型数据处理中表现出色,具备高效内存管理和任务调度能力。然而,在开通流程、API文档及功能集成度方面仍有改进空间。总体而言,MaxFrame在易用性和计算效率上具有明显优势,但在开放性和社区支持方面有待加强。
41 9
|
13天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
49 2
|
14天前
|
人工智能 分布式计算 数据处理
云产品评测:分布式Python计算服务MaxFrame
云产品评测:分布式Python计算服务MaxFrame
49 3
|
29天前
|
数据采集 JSON 测试技术
Grequests,非常 Nice 的 Python 异步 HTTP 请求神器
在Python开发中,处理HTTP请求至关重要。`grequests`库基于`requests`,支持异步请求,通过`gevent`实现并发,提高性能。本文介绍了`grequests`的安装、基本与高级功能,如GET/POST请求、并发控制等,并探讨其在实际项目中的应用。
40 3
|
8天前
|
分布式计算 数据处理 MaxCompute
分布式Python计算服务MaxFrame使用心得
大家好,我是V哥。MaxFrame是阿里云自研的分布式计算框架,专为Python开发者设计,支持大规模数据处理和AI模型开发。MaxFrame适用于快速进行数据处理、数据科学和交互式探索,支持按量付费及包年包月两种计费方式。通过两个案例(金融数据清洗和大语言模型预处理),展示了MaxFrame在大规模数据处理中的显著性能提升。安装MaxFrame客户端只需简单几步,轻松开启高效数据处理之旅。欢迎关注威哥爱编程,一起交流技术心得!
|
3月前
|
关系型数据库 MySQL 数据处理
探索Python中的异步编程:从asyncio到异步数据库操作
在这个快节奏的技术世界里,效率和性能是关键。本文将带你深入Python的异步编程世界,从基础的asyncio库开始,逐步探索到异步数据库操作的高级应用。我们将一起揭开异步编程的神秘面纱,探索它如何帮助我们提升应用程序的性能和响应速度。
|
3月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
2月前
|
NoSQL 关系型数据库 MySQL
python协程+异步总结!
本文介绍了Python中的协程、asyncio模块以及异步编程的相关知识。首先解释了协程的概念和实现方法,包括greenlet、yield关键字、asyncio装饰器和async/await关键字。接着详细讲解了协程的意义和应用场景,如提高IO密集型任务的性能。文章还介绍了事件循环、Task对象、Future对象等核心概念,并提供了多个实战案例,包括异步Redis、MySQL操作、FastAPI框架和异步爬虫。最后提到了uvloop作为asyncio的高性能替代方案。通过这些内容,读者可以全面了解和掌握Python中的异步编程技术。
51 0
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?