Python自定义主从分布式架构

简介:

0、环境:Win7 x64,Python 2.7,APScheduler 2.1.2。

1、图:

wKiom1R-e2ry4GwEAAEVjb9davM954.jpg

2、代码:

(1)、中心节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 中心节点(主要功能是分配任务)
 
import  SocketServer, socket, Queue
 
CenterIP  =  '127.0.0.1'     #中心节点IP
CenterListenPort  =  9999    #中心节点监听端口
CenterClient  =  socket.socket(socket.AF_INET, socket.SOCK_DGRAM)   #中心节点用于发送网络消息的socket
TaskQueue  =  Queue.Queue()  #任务队列
 
#获取任务队列
def  GetTaskQueue():
     for  in  range ( 1 11 ):
         TaskQueue.put( str (i))
 
#CenterServer的回调函数,在接受到udp报文是触发
class  MyUDPHandler(SocketServer.BaseRequestHandler):
     def  handle( self ):
         data  =  self .request[ 0 ].strip()
         socket  =  self .request[ 1 ]
         print (data)
         
         if  data.startswith( 'wait' ):   
             vec  =  data.split( ':' )
             if  len (vec) ! =  3 :
                 print ( 'Error: len(vec) != 3' )
             else :
                 nodeIP  =  vec[ 1 ]
                 nodeListenPort  =  vec[ 2 ]
                 nodeID  =  nodeIP  +  ':'  +  nodeListenPort
                 if  not  TaskQueue.empty():
                     task  =  TaskQueue.get()
                     print ( 'send task '  +  task  +  ' to '  +  nodeID)
                     CenterClient.sendto( 'task:'  +  task, (nodeIP,  int (nodeListenPort)))
                 else :
                     print ( 'TaskQueue is empty!' )
 
GetTaskQueue()   #获取任务队列
 
CenterServer  =  SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print ( 'Listen port '  +  str (CenterListenPort)  +  ' ...' )
CenterServer.serve_forever()

(2)、任务节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 任务节点(请求/接收/执行任务)
 
import  time, socket, SocketServer
from  apscheduler.scheduler  import  Scheduler
 
CenterIP  =  '127.0.0.1'     #中心节点IP
CenterListenPort  =  9999    #中心节点监听端口
NodeIP  =  socket.gethostbyname(socket.gethostname())    #任务节点自身IP
NodeClient  =  socket.socket(socket.AF_INET, socket.SOCK_DGRAM)     #任务节点用于发送网络消息的socket
 
#任务:发送网络信息
def  jobSendNetMsg():
     msg  =  ''
     if  NodeServer.TaskState  = =  'wait' :
         msg  =  'wait:'  +  NodeIP  +  ':'  +  str (NodeListenPort)
     elif  NodeServer.TaskState  = =  'exec' :
         msg  =  'exec:'  +  NodeIP  +  ':'  +  str (NodeListenPort)
     
     print (msg)
     NodeClient.sendto(msg, (CenterIP, CenterListenPort)) 
 
#添加并启动定时任务
def  InitTimer():
     sched  =  Scheduler()
     sched.add_interval_job(jobSendNetMsg, seconds = 1 )
     sched.start()
     
#执行任务
def  ExecTask(task):
     print ( 'ExecTask '  +  task  +  ' ...' )
     time.sleep( 2 )
     print ( 'ExecTask '  +  task  +  ' over' )
 
#NodeServer的回调函数,在接受到udp报文是触发
class  MyUDPHandler(SocketServer.BaseRequestHandler):
     def  handle( self ):
         data  =  self .request[ 0 ].strip()
         socket  =  self .request[ 1 ]
         print ( 'recv data: '  +  data)
         
         if  data.startswith( 'task' ):
             vec  =  data.split( ':' )
             if  len (vec) ! =  2 :
                 print ( 'Error: len(vec) != 2' )
             else :
                 task  =  vec[ 1 ]
                 self .server.TaskState  =  'exec'
                 ExecTask(task)
                 self .server.TaskState  =  'wait'
 
InitTimer()
                 
NodeServer  =  SocketServer.UDPServer(('',  0 ), MyUDPHandler)
NodeServer.TaskState  =  'wait'  #(exec/wait)
NodeListenPort  =  NodeServer.server_address[ 1 ]
print ( 'NodeListenPort:'  +  str (NodeListenPort))
NodeServer.serve_forever()


*** walker * 2014-12-03 ***

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1585826如需转载请自行联系原作者

RQSLT
相关文章
|
1月前
|
运维 负载均衡 安全
深度解析:Python Web前后端分离架构中WebSocket的选型与实现策略
深度解析:Python Web前后端分离架构中WebSocket的选型与实现策略
96 0
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
2月前
|
存储 JSON 数据库
Elasticsearch 分布式架构解析
【9月更文第2天】Elasticsearch 是一个分布式的搜索和分析引擎,以其高可扩展性和实时性著称。它基于 Lucene 开发,但提供了更高级别的抽象,使得开发者能够轻松地构建复杂的搜索应用。本文将深入探讨 Elasticsearch 的分布式存储和检索机制,解释其背后的原理及其优势。
191 5
|
3月前
|
Python
探索Python中的魔法方法:打造你自己的自定义对象
【8月更文挑战第29天】在Python的世界里,魔法方法如同神秘的咒语,它们赋予了对象超常的能力。本文将带你一探究竟,学习如何通过魔法方法来定制你的对象行为,让你的代码更具魔力。
43 5
|
5天前
|
存储 JSON API
如何自定义Python环境变量?
如何自定义Python环境变量?
18 3
|
24天前
|
运维 供应链 安全
SD-WAN分布式组网:构建高效、灵活的企业网络架构
本文介绍了SD-WAN(软件定义广域网)在企业分布式组网中的应用,强调其智能化流量管理、简化的网络部署、弹性扩展能力和增强的安全性等核心优势,以及在跨国企业、多云环境、零售连锁和制造业中的典型应用场景。通过合理设计网络架构、选择合适的网络连接类型、优化应用流量优先级和定期评估网络性能等最佳实践,SD-WAN助力企业实现高效、稳定的业务连接,加速数字化转型。
SD-WAN分布式组网:构建高效、灵活的企业网络架构
|
29天前
|
消息中间件 关系型数据库 Java
‘分布式事务‘ 圣经:从入门到精通,架构师尼恩最新、最全详解 (50+图文4万字全面总结 )
本文 是 基于尼恩之前写的一篇 分布式事务的文章 升级而来 , 尼恩之前写的 分布式事务的文章, 在全网阅读量 100万次以上 , 被很多培训机构 作为 顶级教程。 此文修改了 老版本的 一个大bug , 大家不要再看老版本啦。
|
1月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
2月前
|
jenkins 持续交付 Docker
docker之自定义制作镜像(python程序)
docker之自定义制作镜像(python程序)