Python自定义主从分布式架构

简介:

0、环境:Win7 x64,Python 2.7,APScheduler 2.1.2。

1、图:

wKiom1R-e2ry4GwEAAEVjb9davM954.jpg

2、代码:

(1)、中心节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 中心节点(主要功能是分配任务)
 
import  SocketServer, socket, Queue
 
CenterIP  =  '127.0.0.1'     #中心节点IP
CenterListenPort  =  9999    #中心节点监听端口
CenterClient  =  socket.socket(socket.AF_INET, socket.SOCK_DGRAM)   #中心节点用于发送网络消息的socket
TaskQueue  =  Queue.Queue()  #任务队列
 
#获取任务队列
def  GetTaskQueue():
     for  in  range ( 1 11 ):
         TaskQueue.put( str (i))
 
#CenterServer的回调函数,在接受到udp报文是触发
class  MyUDPHandler(SocketServer.BaseRequestHandler):
     def  handle( self ):
         data  =  self .request[ 0 ].strip()
         socket  =  self .request[ 1 ]
         print (data)
         
         if  data.startswith( 'wait' ):   
             vec  =  data.split( ':' )
             if  len (vec) ! =  3 :
                 print ( 'Error: len(vec) != 3' )
             else :
                 nodeIP  =  vec[ 1 ]
                 nodeListenPort  =  vec[ 2 ]
                 nodeID  =  nodeIP  +  ':'  +  nodeListenPort
                 if  not  TaskQueue.empty():
                     task  =  TaskQueue.get()
                     print ( 'send task '  +  task  +  ' to '  +  nodeID)
                     CenterClient.sendto( 'task:'  +  task, (nodeIP,  int (nodeListenPort)))
                 else :
                     print ( 'TaskQueue is empty!' )
 
GetTaskQueue()   #获取任务队列
 
CenterServer  =  SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print ( 'Listen port '  +  str (CenterListenPort)  +  ' ...' )
CenterServer.serve_forever()

(2)、任务节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 任务节点(请求/接收/执行任务)
 
import  time, socket, SocketServer
from  apscheduler.scheduler  import  Scheduler
 
CenterIP  =  '127.0.0.1'     #中心节点IP
CenterListenPort  =  9999    #中心节点监听端口
NodeIP  =  socket.gethostbyname(socket.gethostname())    #任务节点自身IP
NodeClient  =  socket.socket(socket.AF_INET, socket.SOCK_DGRAM)     #任务节点用于发送网络消息的socket
 
#任务:发送网络信息
def  jobSendNetMsg():
     msg  =  ''
     if  NodeServer.TaskState  = =  'wait' :
         msg  =  'wait:'  +  NodeIP  +  ':'  +  str (NodeListenPort)
     elif  NodeServer.TaskState  = =  'exec' :
         msg  =  'exec:'  +  NodeIP  +  ':'  +  str (NodeListenPort)
     
     print (msg)
     NodeClient.sendto(msg, (CenterIP, CenterListenPort)) 
 
#添加并启动定时任务
def  InitTimer():
     sched  =  Scheduler()
     sched.add_interval_job(jobSendNetMsg, seconds = 1 )
     sched.start()
     
#执行任务
def  ExecTask(task):
     print ( 'ExecTask '  +  task  +  ' ...' )
     time.sleep( 2 )
     print ( 'ExecTask '  +  task  +  ' over' )
 
#NodeServer的回调函数,在接受到udp报文是触发
class  MyUDPHandler(SocketServer.BaseRequestHandler):
     def  handle( self ):
         data  =  self .request[ 0 ].strip()
         socket  =  self .request[ 1 ]
         print ( 'recv data: '  +  data)
         
         if  data.startswith( 'task' ):
             vec  =  data.split( ':' )
             if  len (vec) ! =  2 :
                 print ( 'Error: len(vec) != 2' )
             else :
                 task  =  vec[ 1 ]
                 self .server.TaskState  =  'exec'
                 ExecTask(task)
                 self .server.TaskState  =  'wait'
 
InitTimer()
                 
NodeServer  =  SocketServer.UDPServer(('',  0 ), MyUDPHandler)
NodeServer.TaskState  =  'wait'  #(exec/wait)
NodeListenPort  =  NodeServer.server_address[ 1 ]
print ( 'NodeListenPort:'  +  str (NodeListenPort))
NodeServer.serve_forever()


*** walker * 2014-12-03 ***

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1585826如需转载请自行联系原作者

RQSLT
相关文章
|
2月前
|
设计模式 SQL 人工智能
Python设计模式:从代码复用到系统架构的实践指南
本文以Python为实现语言,深入解析23种经典设计模式的核心思想与实战技巧。通过真实项目案例,展示设计模式在软件开发中的结构化思维价值,涵盖创建型、结构型、行为型三大类别,并结合Python动态语言特性,探讨模式的最佳应用场景与实现方式,帮助开发者写出更清晰、易维护的高质量代码。
81 1
|
2月前
|
设计模式 人工智能 算法
Python设计模式:从代码复用到系统架构的实践指南
本文探讨了电商系统中因支付方式扩展导致代码臃肿的问题,引出设计模式作为解决方案。通过工厂模式、策略模式、单例模式等经典设计,实现代码解耦与系统扩展性提升。结合Python语言特性,展示了模块化、装饰器、适配器等模式的实战应用,并延伸至AI时代的设计创新,帮助开发者构建高内聚、低耦合、易维护的软件系统。
255 0
|
2月前
|
NoSQL Java Redis
基于Redisson和自定义注解的分布式锁实现策略。
在实现分布式锁时,保证各个组件配置恰当、异常处理充足、资源清理彻底是至关重要的。这样保障了在分布布局场景下,锁的正确性和高效性,使得系统的稳健性得到增强。通过这种方式,可以有效预防并发环境下的资源冲突问题。
155 29
|
4月前
|
监控 Linux 应用服务中间件
Linux多节点多硬盘部署MinIO:分布式MinIO集群部署指南搭建高可用架构实践
通过以上步骤,已成功基于已有的 MinIO 服务,扩展为一个 MinIO 集群。该集群具有高可用性和容错性,适合生产环境使用。如果有任何问题,请检查日志或参考MinIO 官方文档。作者联系方式vx:2743642415。
1188 57
|
8月前
|
存储 缓存 NoSQL
分布式系统架构8:分布式缓存
本文介绍了分布式缓存的理论知识及Redis集群的应用,探讨了AP与CP的区别,Redis作为AP系统具备高性能和高可用性但不保证强一致性。文章还讲解了透明多级缓存(TMC)的概念及其优缺点,并详细分析了memcached和Redis的分布式实现方案。此外,针对缓存穿透、击穿、雪崩和污染等常见问题提供了应对策略,强调了Cache Aside模式在解决数据一致性方面的作用。最后指出,面试中关于缓存的问题多围绕Redis展开,建议深入学习相关知识点。
573 8
|
4月前
|
消息中间件 缓存 算法
分布式开发:数字时代的高性能架构革命-为什么要用分布式?优雅草卓伊凡
分布式开发:数字时代的高性能架构革命-为什么要用分布式?优雅草卓伊凡
219 0
分布式开发:数字时代的高性能架构革命-为什么要用分布式?优雅草卓伊凡
|
6月前
|
并行计算 PyTorch 算法框架/工具
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
本文探讨了如何通过技术手段混合使用AMD与NVIDIA GPU集群以支持PyTorch分布式训练。面对CUDA与ROCm框架互操作性不足的问题,文章提出利用UCC和UCX等统一通信框架实现高效数据传输,并在异构Kubernetes集群中部署任务。通过解决轻度与强度异构环境下的挑战,如计算能力不平衡、内存容量差异及通信性能优化,文章展示了如何无需重构代码即可充分利用异构硬件资源。尽管存在RDMA验证不足、通信性能次优等局限性,但该方案为最大化GPU资源利用率、降低供应商锁定提供了可行路径。源代码已公开,供读者参考实践。
446 3
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
|
6月前
|
设计模式 机器学习/深度学习 前端开发
Python 高级编程与实战:深入理解设计模式与软件架构
本文深入探讨了Python中的设计模式与软件架构,涵盖单例、工厂、观察者模式及MVC、微服务架构,并通过实战项目如插件系统和Web应用帮助读者掌握这些技术。文章提供了代码示例,便于理解和实践。最后推荐了进一步学习的资源,助力提升Python编程技能。
|
6月前
|
人工智能 运维 监控
领先AI企业经验谈:探究AI分布式推理网络架构实践
当前,AI行业正处于快速发展的关键时期。继DeepSeek大放异彩之后,又一款备受瞩目的AI智能体产品Manus横空出世。Manus具备独立思考、规划和执行复杂任务的能力,其多智能体架构能够自主调用工具。在GAIA基准测试中,Manus的性能超越了OpenAI同层次的大模型,展现出卓越的技术实力。
|
6月前
|
缓存 Shell 开发工具
[oeasy]python071_我可以自己做一个模块吗_自定义模块_引入模块_import_diy
本文介绍了 Python 中模块的导入与自定义模块的创建。首先,我们回忆了模块的概念,即封装好功能的部件,并通过导入 `__hello__` 模块实现了输出 "hello world!" 的功能。接着,尝试创建并编辑自己的模块 `my_file.py`,引入 `time` 模块以获取当前时间,并在其中添加自定义输出。
97 6

热门文章

最新文章

推荐镜像

更多