Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现 1

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

 


 

 

 

测试环境

Win7 64位

 

Linux 64位

 

Python 3.3.4

 

kazoo-2.6.1-py2.py3-none-any.whl(windows)

kazoo-2.6.1.tar.gz (linux)

https://pypi.org/project/kazoo/#files

 

zookeeper-3.4.13.tar.gz

下载地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

 

功能需求

把不同的负载主机,注册为zookeeper的节点,其它应用模块请求zookeeper获取相关节点信息(服务器ip,端口号,服务器任务执行状态),通过服务器任务状态选择没有运行指定任务的服务器执行相关任务。

 

针对以上需求,做一个技术预研,核心代码实现

 

 

实现思路

负载服务器启动时,初始化zookeeper客户端,创建tcp服务器,注册节点信息到zookeeper服务器(信息包含tcp服务器ip,端口,负载服务器任务执行状态),然后定时检测负载服务器任务执行状态(通过检测某个进程的名称是否存在进行判断),其它应用模块通过zookeeper获取节点信息后,通过tcp socket通信,向负载服务器发送执行命令,然后负载服务器根据这些命令进行不同的处理。

 

 

代码实践(关键技术点实现)

代码模块组织结构

 

 

 

配置文件解析

conf/agent.conf

[AGENT]

interval = 5

proc = sftp-server

 

 

[README]

interval = 更新服务器节点信息频率(单位 秒

proc = 需要检测的进程名称(程序通过查找对应进程名称来判断负载程序是否还在运行,从而判断服务器状态

 

conf/tcpserver.conf

[TCPSERVER]

host=10.202.7.165

port = 8000

 

[README]

host = tcp服务器主机地址

port = tcp服务器监听端口

 

conf/zookeeper.conf

[ZOOKEEPER]

hosts = 10.118.52.26:2181

nodeParentPath=/rootNode

 

[README]

hosts = zookeeper地址,如果是集群地址,即有多个,用英文逗号分隔

nodeParentPath=负载机节点所在父级路径

 

MyTCPServer.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socketserver

from log import logger

 

 

class MyTCPHandler(socketserver.BaseRequestHandler):

   """

   The RequestHandler class for our server.

 

   It is instantiated once per connection to the server, and must

   override the handle() method to implement communication to the

   client.

   """

 

   def handle(self):

       while True:

           # self.request is the TCP socket connected to the client

           self.data = self.request.recv(1024).decode('utf-8').strip()

           logger.info('receive data from client[host:%s port:%s]:%s' % (self.client_address[0], self.client_address[1], self.data))

           if self.data == 'bye':

               self.request.sendall(bytes('bye', encoding='utf-8'))

               self.request.close()

               break

           else:

               self.request.sendall(self.data.upper().encode('utf-8'))

 

class MyTCPServer:

   def __init__(self, host, port):

       try:

           self.host = host

           self.port = port

 

           # Create the server, binding to self.host on port 'self.port'

           self.server = socketserver.TCPServer((self.host, self.port), MyTCPHandler)

       except Exception as e:

           logger.error('初始化TCPServer失败:%s' % e)

           exit(1)

 

   def start(self):

       # Activate the server; this will keep running until you interrupt the program with Ctrl-C

       self.server.serve_forever()

 

MyTCPClient.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socket

import configparser

import time

 

from log import logger

 

 

if __name__ == '__main__':

   if_sock_connected = False

   try:

       config_parser = configparser.ConfigParser()

       config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')

       host = config_parser.get('TCPSERVER', 'host')

       port = int(config_parser.get('TCPSERVER', 'port'))

 

       # Create a socket (SOCK_STREAM means a TCP socket)

       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

 

       # Connect to server and send data

       sock.connect((host, port))

 

       if_sock_connected = True # 标记socket是否已连接

       i = 0

 

       while i < 10000:

           if i == 1000:

               sock.sendall(bytes('bye\n', "utf-8"))

           else:

               sock.sendall(bytes('hello world with tcp\n', "utf-8"))

 

           # Receive data from the server

           received = str(sock.recv(1024), "utf-8")

           logger.info('receive data from server:%s' % received)

           if received == 'bye':

               break

 

           time.sleep(5)

 

           i += 1

   except Exception as e:

       logger.error('程序运行出错:%s' % e)

   finally:

       if if_sock_connected:

           sock.close()

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
1月前
|
机器学习/深度学习 人工智能 算法
基于Python深度学习的眼疾识别系统实现~人工智能+卷积网络算法
眼疾识别系统,本系统使用Python作为主要开发语言,基于TensorFlow搭建卷积神经网络算法,并收集了4种常见的眼疾图像数据集(白内障、糖尿病性视网膜病变、青光眼和正常眼睛) 再使用通过搭建的算法模型对数据集进行训练得到一个识别精度较高的模型,然后保存为为本地h5格式文件。最后使用Django框架搭建了一个Web网页平台可视化操作界面,实现用户上传一张眼疾图片识别其名称。
130 5
基于Python深度学习的眼疾识别系统实现~人工智能+卷积网络算法
|
2月前
|
机器学习/深度学习 人工智能 算法
猫狗宠物识别系统Python+TensorFlow+人工智能+深度学习+卷积网络算法
宠物识别系统使用Python和TensorFlow搭建卷积神经网络,基于37种常见猫狗数据集训练高精度模型,并保存为h5格式。通过Django框架搭建Web平台,用户上传宠物图片即可识别其名称,提供便捷的宠物识别服务。
347 55
|
1月前
|
安全 前端开发 数据库
Python 语言结合 Flask 框架来实现一个基础的代购商品管理、用户下单等功能的简易系统
这是一个使用 Python 和 Flask 框架实现的简易代购系统示例,涵盖商品管理、用户注册登录、订单创建及查看等功能。通过 SQLAlchemy 进行数据库操作,支持添加商品、展示详情、库存管理等。用户可注册登录并下单,系统会检查库存并记录订单。此代码仅为参考,实际应用需进一步完善,如增强安全性、集成支付接口、优化界面等。
|
5天前
|
机器学习/深度学习 人工智能 算法
基于Python深度学习的【蘑菇识别】系统~卷积神经网络+TensorFlow+图像识别+人工智能
蘑菇识别系统,本系统使用Python作为主要开发语言,基于TensorFlow搭建卷积神经网络算法,并收集了9种常见的蘑菇种类数据集【"香菇(Agaricus)", "毒鹅膏菌(Amanita)", "牛肝菌(Boletus)", "网状菌(Cortinarius)", "毒镰孢(Entoloma)", "湿孢菌(Hygrocybe)", "乳菇(Lactarius)", "红菇(Russula)", "松茸(Suillus)"】 再使用通过搭建的算法模型对数据集进行训练得到一个识别精度较高的模型,然后保存为为本地h5格式文件。最后使用Django框架搭建了一个Web网页平台可视化操作界面,
41 11
基于Python深度学习的【蘑菇识别】系统~卷积神经网络+TensorFlow+图像识别+人工智能
|
2月前
|
存储 缓存 监控
局域网屏幕监控系统中的Python数据结构与算法实现
局域网屏幕监控系统用于实时捕获和监控局域网内多台设备的屏幕内容。本文介绍了一种基于Python双端队列(Deque)实现的滑动窗口数据缓存机制,以处理连续的屏幕帧数据流。通过固定长度的窗口,高效增删数据,确保低延迟显示和存储。该算法适用于数据压缩、异常检测等场景,保证系统在高负载下稳定运行。 本文转载自:https://www.vipshare.com
132 66
|
30天前
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
|
2月前
|
机器学习/深度学习 人工智能 算法
【宠物识别系统】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+图像识别
宠物识别系统,本系统使用Python作为主要开发语言,基于TensorFlow搭建卷积神经网络算法,并收集了37种常见的猫狗宠物种类数据集【'阿比西尼亚猫(Abyssinian)', '孟加拉猫(Bengal)', '暹罗猫(Birman)', '孟买猫(Bombay)', '英国短毛猫(British Shorthair)', '埃及猫(Egyptian Mau)', '缅因猫(Maine Coon)', '波斯猫(Persian)', '布偶猫(Ragdoll)', '俄罗斯蓝猫(Russian Blue)', '暹罗猫(Siamese)', '斯芬克斯猫(Sphynx)', '美国斗牛犬
220 29
【宠物识别系统】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+图像识别
|
1月前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
50 7
|
2月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
1月前
|
机器学习/深度学习 算法 前端开发
基于Python深度学习果蔬识别系统实现
本项目基于Python和TensorFlow,使用ResNet卷积神经网络模型,对12种常见果蔬(如土豆、苹果等)的图像数据集进行训练,构建了一个高精度的果蔬识别系统。系统通过Django框架搭建Web端可视化界面,用户可上传图片并自动识别果蔬种类。该项目旨在提高农业生产效率,广泛应用于食品安全、智能农业等领域。CNN凭借其强大的特征提取能力,在图像分类任务中表现出色,为实现高效的自动化果蔬识别提供了技术支持。
基于Python深度学习果蔬识别系统实现