Python自定义主从分布式架构

简介:

0、环境:Win7 x64,Python 2.7,APScheduler 2.1.2。

1、图:

wKiom1R-e2ry4GwEAAEVjb9davM954.jpg

2、代码:

(1)、中心节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 中心节点(主要功能是分配任务)
 
import  SocketServer, socket, Queue
 
CenterIP  =  '127.0.0.1'     #中心节点IP
CenterListenPort  =  9999    #中心节点监听端口
CenterClient  =  socket.socket(socket.AF_INET, socket.SOCK_DGRAM)   #中心节点用于发送网络消息的socket
TaskQueue  =  Queue.Queue()  #任务队列
 
#获取任务队列
def  GetTaskQueue():
     for  in  range ( 1 11 ):
         TaskQueue.put( str (i))
 
#CenterServer的回调函数,在接受到udp报文是触发
class  MyUDPHandler(SocketServer.BaseRequestHandler):
     def  handle( self ):
         data  =  self .request[ 0 ].strip()
         socket  =  self .request[ 1 ]
         print (data)
         
         if  data.startswith( 'wait' ):   
             vec  =  data.split( ':' )
             if  len (vec) ! =  3 :
                 print ( 'Error: len(vec) != 3' )
             else :
                 nodeIP  =  vec[ 1 ]
                 nodeListenPort  =  vec[ 2 ]
                 nodeID  =  nodeIP  +  ':'  +  nodeListenPort
                 if  not  TaskQueue.empty():
                     task  =  TaskQueue.get()
                     print ( 'send task '  +  task  +  ' to '  +  nodeID)
                     CenterClient.sendto( 'task:'  +  task, (nodeIP,  int (nodeListenPort)))
                 else :
                     print ( 'TaskQueue is empty!' )
 
GetTaskQueue()   #获取任务队列
 
CenterServer  =  SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print ( 'Listen port '  +  str (CenterListenPort)  +  ' ...' )
CenterServer.serve_forever()

(2)、任务节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 任务节点(请求/接收/执行任务)
 
import  time, socket, SocketServer
from  apscheduler.scheduler  import  Scheduler
 
CenterIP  =  '127.0.0.1'     #中心节点IP
CenterListenPort  =  9999    #中心节点监听端口
NodeIP  =  socket.gethostbyname(socket.gethostname())    #任务节点自身IP
NodeClient  =  socket.socket(socket.AF_INET, socket.SOCK_DGRAM)     #任务节点用于发送网络消息的socket
 
#任务:发送网络信息
def  jobSendNetMsg():
     msg  =  ''
     if  NodeServer.TaskState  = =  'wait' :
         msg  =  'wait:'  +  NodeIP  +  ':'  +  str (NodeListenPort)
     elif  NodeServer.TaskState  = =  'exec' :
         msg  =  'exec:'  +  NodeIP  +  ':'  +  str (NodeListenPort)
     
     print (msg)
     NodeClient.sendto(msg, (CenterIP, CenterListenPort)) 
 
#添加并启动定时任务
def  InitTimer():
     sched  =  Scheduler()
     sched.add_interval_job(jobSendNetMsg, seconds = 1 )
     sched.start()
     
#执行任务
def  ExecTask(task):
     print ( 'ExecTask '  +  task  +  ' ...' )
     time.sleep( 2 )
     print ( 'ExecTask '  +  task  +  ' over' )
 
#NodeServer的回调函数,在接受到udp报文是触发
class  MyUDPHandler(SocketServer.BaseRequestHandler):
     def  handle( self ):
         data  =  self .request[ 0 ].strip()
         socket  =  self .request[ 1 ]
         print ( 'recv data: '  +  data)
         
         if  data.startswith( 'task' ):
             vec  =  data.split( ':' )
             if  len (vec) ! =  2 :
                 print ( 'Error: len(vec) != 2' )
             else :
                 task  =  vec[ 1 ]
                 self .server.TaskState  =  'exec'
                 ExecTask(task)
                 self .server.TaskState  =  'wait'
 
InitTimer()
                 
NodeServer  =  SocketServer.UDPServer(('',  0 ), MyUDPHandler)
NodeServer.TaskState  =  'wait'  #(exec/wait)
NodeListenPort  =  NodeServer.server_address[ 1 ]
print ( 'NodeListenPort:'  +  str (NodeListenPort))
NodeServer.serve_forever()


*** walker * 2014-12-03 ***

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1585826如需转载请自行联系原作者

RQSLT
相关文章
|
15天前
|
存储 JSON 数据库
Elasticsearch 分布式架构解析
【9月更文第2天】Elasticsearch 是一个分布式的搜索和分析引擎,以其高可扩展性和实时性著称。它基于 Lucene 开发,但提供了更高级别的抽象,使得开发者能够轻松地构建复杂的搜索应用。本文将深入探讨 Elasticsearch 的分布式存储和检索机制,解释其背后的原理及其优势。
59 5
|
19天前
|
Python
探索Python中的魔法方法:打造你自己的自定义对象
【8月更文挑战第29天】在Python的世界里,魔法方法如同神秘的咒语,它们赋予了对象超常的能力。本文将带你一探究竟,学习如何通过魔法方法来定制你的对象行为,让你的代码更具魔力。
36 5
|
1月前
|
前端开发 Python
使用Python+openpyxl实现导出自定义样式的Excel文件
本文介绍了如何使用Python的openpyxl库导出具有自定义样式的Excel文件,包括设置字体、对齐方式、行列宽高、边框和填充等样式,并提供了完整的示例代码和运行效果截图。
26 1
使用Python+openpyxl实现导出自定义样式的Excel文件
|
1月前
|
存储 NoSQL Java
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
这篇文章是关于Java面试中的分布式架构问题的笔记,包括分布式架构下的Session共享方案、RPC和RMI的理解、分布式ID生成方案、分布式锁解决方案以及分布式事务解决方案。
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
|
22天前
|
监控 安全 中间件
Python Django 后端架构开发: 中间件架构设计
Python Django 后端架构开发: 中间件架构设计
21 1
|
24天前
|
弹性计算 Cloud Native Windows
核心系统转型问题之核心系统需要转型到云原生分布式架构的原因如何解决
核心系统转型问题之核心系统需要转型到云原生分布式架构的原因如何解决
|
26天前
|
消息中间件 JSON 自然语言处理
Python多进程日志以及分布式日志的实现方式
python日志模块logging支持多线程,但是在多进程下写入日志文件容易出现下面的问题: PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。 也就是日志文件被占用的情况,原因是多个进程的文件handler对日志文件进行操作产生的。
|
1月前
|
Cloud Native 云计算 微服务
云原生时代:企业分布式应用架构的惊人蜕变,从SOA到微服务的大逃亡!
【8月更文挑战第8天】在云计算与容器技术推动下,企业分布式应用架构正经历从SOA到微服务再到云原生的深刻变革。SOA强调服务重用与组合,通过标准化接口实现服务解耦;微服务以细粒度划分服务,增强系统灵活性;云原生架构借助容器化与自动化技术简化部署与管理。每一步演进都为企业带来新的技术挑战与机遇。
81 6
|
17天前
|
UED Python
探索Python中的魔法方法:打造自定义字符串表示
【8月更文挑战第31天】在Python的世界里,魔法方法是那些以双下划线开头和结尾的特殊方法,它们为类提供了丰富的功能。本文将带你走进这些魔法方法的背后,特别是__str__和__repr__,揭示如何通过它们来定制我们的对象在被打印或转换为字符串时的外观。我们将从基础用法开始,逐步深入到高级技巧,包括继承与重写,最终实现一个优雅的字符串表示方案。准备好了吗?让我们开始这段代码之旅吧!
|
1月前
|
Kubernetes Python 容器
[python]使用diagrams绘制架构图
[python]使用diagrams绘制架构图