基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现
测试环境
Win7 64位
Linux 64位
Python 3.3.4
kazoo-2.6.1-py2.py3-none-any.whl(windows)
kazoo-2.6.1.tar.gz (linux)
https://pypi.org/project/kazoo/#files
zookeeper-3.4.13.tar.gz
下载地址1:
http://zookeeper.apache.org/releases.html#download
https://www.apache.org/dyn/closer.cgi/zookeeper/
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
功能需求
把不同的负载主机,注册为zookeeper的节点,其它应用模块请求zookeeper获取相关节点信息(服务器ip,端口号,服务器任务执行状态),通过服务器任务状态选择没有运行指定任务的服务器执行相关任务。
针对以上需求,做一个技术预研,核心代码实现
实现思路
负载服务器启动时,初始化zookeeper客户端,创建tcp服务器,注册节点信息到zookeeper服务器(信息包含tcp服务器ip,端口,负载服务器任务执行状态),然后定时检测负载服务器任务执行状态(通过检测某个进程的名称是否存在进行判断),其它应用模块通过zookeeper获取节点信息后,通过tcp socket通信,向负载服务器发送执行命令,然后负载服务器根据这些命令进行不同的处理。
代码实践(关键技术点实现)
代码模块组织结构
配置文件解析
conf/agent.conf
[AGENT]
interval = 5
proc = sftp-server
[README]
interval = 更新服务器节点信息频率(单位 秒
proc = 需要检测的进程名称(程序通过查找对应进程名称来判断负载程序是否还在运行,从而判断服务器状态
conf/tcpserver.conf
[TCPSERVER]
host=10.202.7.165
port = 8000
[README]
host = tcp服务器主机地址
port = tcp服务器监听端口
conf/zookeeper.conf
[ZOOKEEPER]
hosts = 10.118.52.26:2181
nodeParentPath=/rootNode
[README]
hosts = zookeeper地址,如果是集群地址,即有多个,用英文逗号分隔
nodeParentPath=负载机节点所在父级路径
MyTCPServer.py
#!/usr/bin/env python 3.4.0
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
import socketserver
from log import logger
class MyTCPHandler(socketserver.BaseRequestHandler):
"""
The RequestHandler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
"""
def handle(self):
while True:
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).decode('utf-8').strip()
logger.info('receive data from client[host:%s port:%s]:%s' % (self.client_address[0], self.client_address[1], self.data))
if self.data == 'bye':
self.request.sendall(bytes('bye', encoding='utf-8'))
self.request.close()
break
else:
self.request.sendall(self.data.upper().encode('utf-8'))
class MyTCPServer:
def __init__(self, host, port):
try:
self.host = host
self.port = port
# Create the server, binding to self.host on port 'self.port'
self.server = socketserver.TCPServer((self.host, self.port), MyTCPHandler)
except Exception as e:
logger.error('初始化TCPServer失败:%s' % e)
exit(1)
def start(self):
# Activate the server; this will keep running until you interrupt the program with Ctrl-C
self.server.serve_forever()
MyTCPClient.py
#!/usr/bin/env python 3.4.0
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
import socket
import configparser
import time
from log import logger
if __name__ == '__main__':
if_sock_connected = False
try:
config_parser = configparser.ConfigParser()
config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')
host = config_parser.get('TCPSERVER', 'host')
port = int(config_parser.get('TCPSERVER', 'port'))
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect to server and send data
sock.connect((host, port))
if_sock_connected = True # 标记socket是否已连接
i = 0
while i < 10000:
if i == 1000:
sock.sendall(bytes('bye\n', "utf-8"))
else:
sock.sendall(bytes('hello world with tcp\n', "utf-8"))
# Receive data from the server
received = str(sock.recv(1024), "utf-8")
logger.info('receive data from server:%s' % received)
if received == 'bye':
break
time.sleep(5)
i += 1
except Exception as e:
logger.error('程序运行出错:%s' % e)
finally:
if if_sock_connected:
sock.close()