0、环境:Win7 x64,Python 2.7,APScheduler 2.1.2。
1、图:
2、代码:
(1)、中心节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 中心节点(主要功能是分配任务)
import
SocketServer, socket, Queue
CenterIP
=
'127.0.0.1'
#中心节点IP
CenterListenPort
=
9999
#中心节点监听端口
CenterClient
=
socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#中心节点用于发送网络消息的socket
TaskQueue
=
Queue.Queue()
#任务队列
#获取任务队列
def
GetTaskQueue():
for
i
in
range
(
1
,
11
):
TaskQueue.put(
str
(i))
#CenterServer的回调函数,在接受到udp报文是触发
class
MyUDPHandler(SocketServer.BaseRequestHandler):
def
handle(
self
):
data
=
self
.request[
0
].strip()
socket
=
self
.request[
1
]
print
(data)
if
data.startswith(
'wait'
):
vec
=
data.split(
':'
)
if
len
(vec) !
=
3
:
print
(
'Error: len(vec) != 3'
)
else
:
nodeIP
=
vec[
1
]
nodeListenPort
=
vec[
2
]
nodeID
=
nodeIP
+
':'
+
nodeListenPort
if
not
TaskQueue.empty():
task
=
TaskQueue.get()
print
(
'send task '
+
task
+
' to '
+
nodeID)
CenterClient.sendto(
'task:'
+
task, (nodeIP,
int
(nodeListenPort)))
else
:
print
(
'TaskQueue is empty!'
)
GetTaskQueue()
#获取任务队列
CenterServer
=
SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print
(
'Listen port '
+
str
(CenterListenPort)
+
' ...'
)
CenterServer.serve_forever()
|
(2)、任务节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 任务节点(请求/接收/执行任务)
import
time, socket, SocketServer
from
apscheduler.scheduler
import
Scheduler
CenterIP
=
'127.0.0.1'
#中心节点IP
CenterListenPort
=
9999
#中心节点监听端口
NodeIP
=
socket.gethostbyname(socket.gethostname())
#任务节点自身IP
NodeClient
=
socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#任务节点用于发送网络消息的socket
#任务:发送网络信息
def
jobSendNetMsg():
msg
=
''
if
NodeServer.TaskState
=
=
'wait'
:
msg
=
'wait:'
+
NodeIP
+
':'
+
str
(NodeListenPort)
elif
NodeServer.TaskState
=
=
'exec'
:
msg
=
'exec:'
+
NodeIP
+
':'
+
str
(NodeListenPort)
print
(msg)
NodeClient.sendto(msg, (CenterIP, CenterListenPort))
#添加并启动定时任务
def
InitTimer():
sched
=
Scheduler()
sched.add_interval_job(jobSendNetMsg, seconds
=
1
)
sched.start()
#执行任务
def
ExecTask(task):
print
(
'ExecTask '
+
task
+
' ...'
)
time.sleep(
2
)
print
(
'ExecTask '
+
task
+
' over'
)
#NodeServer的回调函数,在接受到udp报文是触发
class
MyUDPHandler(SocketServer.BaseRequestHandler):
def
handle(
self
):
data
=
self
.request[
0
].strip()
socket
=
self
.request[
1
]
print
(
'recv data: '
+
data)
if
data.startswith(
'task'
):
vec
=
data.split(
':'
)
if
len
(vec) !
=
2
:
print
(
'Error: len(vec) != 2'
)
else
:
task
=
vec[
1
]
self
.server.TaskState
=
'exec'
ExecTask(task)
self
.server.TaskState
=
'wait'
InitTimer()
NodeServer
=
SocketServer.UDPServer(('',
0
), MyUDPHandler)
NodeServer.TaskState
=
'wait'
#(exec/wait)
NodeListenPort
=
NodeServer.server_address[
1
]
print
(
'NodeListenPort:'
+
str
(NodeListenPort))
NodeServer.serve_forever()
|
*** walker * 2014-12-03 ***
本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1585826如需转载请自行联系原作者
RQSLT