Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现 1

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介: Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

 


 

 

 

测试环境

Win7 64位

 

Linux 64位

 

Python 3.3.4

 

kazoo-2.6.1-py2.py3-none-any.whl(windows)

kazoo-2.6.1.tar.gz (linux)

https://pypi.org/project/kazoo/#files

 

zookeeper-3.4.13.tar.gz

下载地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

 

功能需求

把不同的负载主机,注册为zookeeper的节点,其它应用模块请求zookeeper获取相关节点信息(服务器ip,端口号,服务器任务执行状态),通过服务器任务状态选择没有运行指定任务的服务器执行相关任务。

 

针对以上需求,做一个技术预研,核心代码实现

 

 

实现思路

负载服务器启动时,初始化zookeeper客户端,创建tcp服务器,注册节点信息到zookeeper服务器(信息包含tcp服务器ip,端口,负载服务器任务执行状态),然后定时检测负载服务器任务执行状态(通过检测某个进程的名称是否存在进行判断),其它应用模块通过zookeeper获取节点信息后,通过tcp socket通信,向负载服务器发送执行命令,然后负载服务器根据这些命令进行不同的处理。

 

 

代码实践(关键技术点实现)

代码模块组织结构

 

 

 

配置文件解析

conf/agent.conf

[AGENT]

interval = 5

proc = sftp-server

 

 

[README]

interval = 更新服务器节点信息频率(单位 秒

proc = 需要检测的进程名称(程序通过查找对应进程名称来判断负载程序是否还在运行,从而判断服务器状态

 

conf/tcpserver.conf

[TCPSERVER]

host=10.202.7.165

port = 8000

 

[README]

host = tcp服务器主机地址

port = tcp服务器监听端口

 

conf/zookeeper.conf

[ZOOKEEPER]

hosts = 10.118.52.26:2181

nodeParentPath=/rootNode

 

[README]

hosts = zookeeper地址,如果是集群地址,即有多个,用英文逗号分隔

nodeParentPath=负载机节点所在父级路径

 

MyTCPServer.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socketserver

from log import logger

 

 

class MyTCPHandler(socketserver.BaseRequestHandler):

   """

   The RequestHandler class for our server.

 

   It is instantiated once per connection to the server, and must

   override the handle() method to implement communication to the

   client.

   """

 

   def handle(self):

       while True:

           # self.request is the TCP socket connected to the client

           self.data = self.request.recv(1024).decode('utf-8').strip()

           logger.info('receive data from client[host:%s port:%s]:%s' % (self.client_address[0], self.client_address[1], self.data))

           if self.data == 'bye':

               self.request.sendall(bytes('bye', encoding='utf-8'))

               self.request.close()

               break

           else:

               self.request.sendall(self.data.upper().encode('utf-8'))

 

class MyTCPServer:

   def __init__(self, host, port):

       try:

           self.host = host

           self.port = port

 

           # Create the server, binding to self.host on port 'self.port'

           self.server = socketserver.TCPServer((self.host, self.port), MyTCPHandler)

       except Exception as e:

           logger.error('初始化TCPServer失败:%s' % e)

           exit(1)

 

   def start(self):

       # Activate the server; this will keep running until you interrupt the program with Ctrl-C

       self.server.serve_forever()

 

MyTCPClient.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socket

import configparser

import time

 

from log import logger

 

 

if __name__ == '__main__':

   if_sock_connected = False

   try:

       config_parser = configparser.ConfigParser()

       config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')

       host = config_parser.get('TCPSERVER', 'host')

       port = int(config_parser.get('TCPSERVER', 'port'))

 

       # Create a socket (SOCK_STREAM means a TCP socket)

       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

 

       # Connect to server and send data

       sock.connect((host, port))

 

       if_sock_connected = True # 标记socket是否已连接

       i = 0

 

       while i < 10000:

           if i == 1000:

               sock.sendall(bytes('bye\n', "utf-8"))

           else:

               sock.sendall(bytes('hello world with tcp\n', "utf-8"))

 

           # Receive data from the server

           received = str(sock.recv(1024), "utf-8")

           logger.info('receive data from server:%s' % received)

           if received == 'bye':

               break

 

           time.sleep(5)

 

           i += 1

   except Exception as e:

       logger.error('程序运行出错:%s' % e)

   finally:

       if if_sock_connected:

           sock.close()

目录
相关文章
|
5天前
|
算法 搜索推荐 JavaScript
基于python智能推荐算法的全屋定制系统
本研究聚焦基于智能推荐算法的全屋定制平台网站设计,旨在解决消费者在个性化定制中面临的选择难题。通过整合Django、Vue、Python与MySQL等技术,构建集家装设计、材料推荐、家具搭配于一体的一站式智能服务平台,提升用户体验与行业数字化水平。
|
3天前
|
存储 分布式计算 大数据
基于Python大数据的的电商用户行为分析系统
本系统基于Django、Scrapy与Hadoop技术,构建电商用户行为分析平台。通过爬取与处理海量用户数据,实现行为追踪、偏好分析与个性化推荐,助力企业提升营销精准度与用户体验,推动电商智能化发展。
|
1月前
|
传感器 算法 安全
基于分布式模型预测控制DMPC的单向拓扑结构下异构车辆车队研究(Matlab代码实现)
基于分布式模型预测控制DMPC的单向拓扑结构下异构车辆车队研究(Matlab代码实现)
|
1月前
|
运维 监控 安全
【风险评估】分布式电源并网对电网的影响及风险评估的研究(Matlab代码实现)
【风险评估】分布式电源并网对电网的影响及风险评估的研究(Matlab代码实现)
|
1月前
|
机器学习/深度学习 并行计算 算法
基于目标级联法的微网群多主体分布式优化调度(Matlab代码实现)
基于目标级联法的微网群多主体分布式优化调度(Matlab代码实现)
|
5天前
|
JavaScript 前端开发 关系型数据库
基于python的电子商务管理系统
本研究聚焦电商管理系统的设计与实现,采用Flask+Vue.js前后端分离架构,结合MySQL数据库与B/S模式,提升系统性能与可扩展性。针对传统电商系统效率低、维护难等问题,提出高效、安全的解决方案,涵盖商品管理、订单处理与用户体验优化,推动电商技术向智能化、可持续化发展。
|
2天前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的台风灾害分析及预测系统
针对台风灾害预警滞后、精度不足等问题,本研究基于Python与大数据技术,构建多源数据融合的台风预测系统。利用机器学习提升路径与强度预测准确率,结合Django框架实现动态可视化与实时预警,为防灾决策提供科学支持,显著提高应急响应效率,具有重要社会经济价值。

推荐镜像

更多