Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现 1

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

 


 

 

 

测试环境

Win7 64位

 

Linux 64位

 

Python 3.3.4

 

kazoo-2.6.1-py2.py3-none-any.whl(windows)

kazoo-2.6.1.tar.gz (linux)

https://pypi.org/project/kazoo/#files

 

zookeeper-3.4.13.tar.gz

下载地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

 

功能需求

把不同的负载主机,注册为zookeeper的节点,其它应用模块请求zookeeper获取相关节点信息(服务器ip,端口号,服务器任务执行状态),通过服务器任务状态选择没有运行指定任务的服务器执行相关任务。

 

针对以上需求,做一个技术预研,核心代码实现

 

 

实现思路

负载服务器启动时,初始化zookeeper客户端,创建tcp服务器,注册节点信息到zookeeper服务器(信息包含tcp服务器ip,端口,负载服务器任务执行状态),然后定时检测负载服务器任务执行状态(通过检测某个进程的名称是否存在进行判断),其它应用模块通过zookeeper获取节点信息后,通过tcp socket通信,向负载服务器发送执行命令,然后负载服务器根据这些命令进行不同的处理。

 

 

代码实践(关键技术点实现)

代码模块组织结构

 

 

 

配置文件解析

conf/agent.conf

[AGENT]

interval = 5

proc = sftp-server

 

 

[README]

interval = 更新服务器节点信息频率(单位 秒

proc = 需要检测的进程名称(程序通过查找对应进程名称来判断负载程序是否还在运行,从而判断服务器状态

 

conf/tcpserver.conf

[TCPSERVER]

host=10.202.7.165

port = 8000

 

[README]

host = tcp服务器主机地址

port = tcp服务器监听端口

 

conf/zookeeper.conf

[ZOOKEEPER]

hosts = 10.118.52.26:2181

nodeParentPath=/rootNode

 

[README]

hosts = zookeeper地址,如果是集群地址,即有多个,用英文逗号分隔

nodeParentPath=负载机节点所在父级路径

 

MyTCPServer.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socketserver

from log import logger

 

 

class MyTCPHandler(socketserver.BaseRequestHandler):

   """

   The RequestHandler class for our server.

 

   It is instantiated once per connection to the server, and must

   override the handle() method to implement communication to the

   client.

   """

 

   def handle(self):

       while True:

           # self.request is the TCP socket connected to the client

           self.data = self.request.recv(1024).decode('utf-8').strip()

           logger.info('receive data from client[host:%s port:%s]:%s' % (self.client_address[0], self.client_address[1], self.data))

           if self.data == 'bye':

               self.request.sendall(bytes('bye', encoding='utf-8'))

               self.request.close()

               break

           else:

               self.request.sendall(self.data.upper().encode('utf-8'))

 

class MyTCPServer:

   def __init__(self, host, port):

       try:

           self.host = host

           self.port = port

 

           # Create the server, binding to self.host on port 'self.port'

           self.server = socketserver.TCPServer((self.host, self.port), MyTCPHandler)

       except Exception as e:

           logger.error('初始化TCPServer失败:%s' % e)

           exit(1)

 

   def start(self):

       # Activate the server; this will keep running until you interrupt the program with Ctrl-C

       self.server.serve_forever()

 

MyTCPClient.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socket

import configparser

import time

 

from log import logger

 

 

if __name__ == '__main__':

   if_sock_connected = False

   try:

       config_parser = configparser.ConfigParser()

       config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')

       host = config_parser.get('TCPSERVER', 'host')

       port = int(config_parser.get('TCPSERVER', 'port'))

 

       # Create a socket (SOCK_STREAM means a TCP socket)

       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

 

       # Connect to server and send data

       sock.connect((host, port))

 

       if_sock_connected = True # 标记socket是否已连接

       i = 0

 

       while i < 10000:

           if i == 1000:

               sock.sendall(bytes('bye\n', "utf-8"))

           else:

               sock.sendall(bytes('hello world with tcp\n', "utf-8"))

 

           # Receive data from the server

           received = str(sock.recv(1024), "utf-8")

           logger.info('receive data from server:%s' % received)

           if received == 'bye':

               break

 

           time.sleep(5)

 

           i += 1

   except Exception as e:

       logger.error('程序运行出错:%s' % e)

   finally:

       if if_sock_connected:

           sock.close()

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
28天前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型的分布式训练
使用Python实现深度学习模型的分布式训练
170 73
|
3天前
|
分布式计算 DataWorks 数据处理
产品测评 | 上手分布式Python计算服务MaxFrame产品最佳实践
MaxFrame是阿里云自研的分布式计算框架,专为大数据处理设计,提供高效便捷的Python开发体验。其主要功能包括Python编程接口、直接利用MaxCompute资源、与MaxCompute Notebook集成及镜像管理功能。本文基于MaxFrame最佳实践,详细介绍了在DataWorks中使用MaxFrame创建数据源、PyODPS节点和MaxFrame会话的过程,并展示了如何通过MaxFrame实现分布式Pandas处理和大语言模型数据处理。测评反馈指出,虽然MaxFrame具备强大的数据处理能力,但在文档细节和新手友好性方面仍有改进空间。
|
13天前
|
Python
课程设计项目之基于Python实现围棋游戏代码
游戏进去默认为九路玩法,当然也可以选择十三路或是十九路玩法 使用pycharam打开项目,pip安装模块并引用,然后运行即可, 代码每行都有详细的注释,可以做课程设计或者毕业设计项目参考
54 33
|
11天前
|
数据采集 人工智能 分布式计算
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
在数据驱动的时代,大数据分析和AI模型训练对数据预处理的效率要求极高。传统的Pandas工具在小数据集下表现出色,但面对大规模数据时力不从心。阿里云推出的Python分布式计算框架MaxFrame,以“Pandas风格”为核心设计理念,旨在降低分布式计算门槛,同时支持超大规模数据处理。MaxFrame不仅保留了Pandas的操作习惯,还通过底层优化实现了高效的分布式调度、内存管理和容错机制,并深度集成阿里云大数据生态。本文将通过实践评测,全面解析MaxFrame的能力与价值,展示其在大数据和AI场景中的卓越表现。
26 4
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
|
6天前
|
SQL 分布式计算 DataWorks
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
|
7天前
|
分布式计算 数据处理 MaxCompute
云产品评测|分布式Python计算服务MaxFrame
云产品评测|分布式Python计算服务MaxFrame
34 2
|
14天前
|
JavaScript API C#
【Azure Developer】Python代码调用Graph API将外部用户添加到组,结果无效,也无错误信息
根据Graph API文档,在单个请求中将多个成员添加到组时,Python代码示例中的`members@odata.bind`被错误写为`members@odata_bind`,导致用户未成功添加。
37 10
|
11天前
|
人工智能 分布式计算 数据处理
有奖评测,基于分布式 Python 计算服务 MaxFrame 进行数据处理
阿里云MaxCompute MaxFrame推出分布式Python计算服务MaxFrame评测活动,助力开发者高效完成大规模数据处理、可视化探索及ML/AI开发。活动时间为2024年12月17日至2025年1月31日,参与者需体验MaxFrame并发布评测文章,有机会赢取精美礼品。
|
23天前
|
人工智能 分布式计算 数据处理
云产品评测:MaxFrame — 分布式Python计算服务的最佳实践与体验
阿里云推出的MaxFrame是一款高性能分布式计算平台,专为大规模数据处理和AI应用设计。它提供了强大的Python编程接口,支持分布式Pandas操作,显著提升数据处理速度(3-5倍)。MaxFrame在大语言模型数据处理中表现出色,具备高效内存管理和任务调度能力。然而,在开通流程、API文档及功能集成度方面仍有改进空间。总体而言,MaxFrame在易用性和计算效率上具有明显优势,但在开放性和社区支持方面有待加强。
46 9
|
25天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
62 2