Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现 1

简介: Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

 


 

 

 

测试环境

Win7 64位

 

Linux 64位

 

Python 3.3.4

 

kazoo-2.6.1-py2.py3-none-any.whl(windows)

kazoo-2.6.1.tar.gz (linux)

https://pypi.org/project/kazoo/#files

 

zookeeper-3.4.13.tar.gz

下载地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

 

功能需求

把不同的负载主机,注册为zookeeper的节点,其它应用模块请求zookeeper获取相关节点信息(服务器ip,端口号,服务器任务执行状态),通过服务器任务状态选择没有运行指定任务的服务器执行相关任务。

 

针对以上需求,做一个技术预研,核心代码实现

 

 

实现思路

负载服务器启动时,初始化zookeeper客户端,创建tcp服务器,注册节点信息到zookeeper服务器(信息包含tcp服务器ip,端口,负载服务器任务执行状态),然后定时检测负载服务器任务执行状态(通过检测某个进程的名称是否存在进行判断),其它应用模块通过zookeeper获取节点信息后,通过tcp socket通信,向负载服务器发送执行命令,然后负载服务器根据这些命令进行不同的处理。

 

 

代码实践(关键技术点实现)

代码模块组织结构

 

 

 

配置文件解析

conf/agent.conf

[AGENT]

interval = 5

proc = sftp-server

 

 

[README]

interval = 更新服务器节点信息频率(单位 秒

proc = 需要检测的进程名称(程序通过查找对应进程名称来判断负载程序是否还在运行,从而判断服务器状态

 

conf/tcpserver.conf

[TCPSERVER]

host=10.202.7.165

port = 8000

 

[README]

host = tcp服务器主机地址

port = tcp服务器监听端口

 

conf/zookeeper.conf

[ZOOKEEPER]

hosts = 10.118.52.26:2181

nodeParentPath=/rootNode

 

[README]

hosts = zookeeper地址,如果是集群地址,即有多个,用英文逗号分隔

nodeParentPath=负载机节点所在父级路径

 

MyTCPServer.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socketserver

from log import logger

 

 

class MyTCPHandler(socketserver.BaseRequestHandler):

   """

   The RequestHandler class for our server.

 

   It is instantiated once per connection to the server, and must

   override the handle() method to implement communication to the

   client.

   """

 

   def handle(self):

       while True:

           # self.request is the TCP socket connected to the client

           self.data = self.request.recv(1024).decode('utf-8').strip()

           logger.info('receive data from client[host:%s port:%s]:%s' % (self.client_address[0], self.client_address[1], self.data))

           if self.data == 'bye':

               self.request.sendall(bytes('bye', encoding='utf-8'))

               self.request.close()

               break

           else:

               self.request.sendall(self.data.upper().encode('utf-8'))

 

class MyTCPServer:

   def __init__(self, host, port):

       try:

           self.host = host

           self.port = port

 

           # Create the server, binding to self.host on port 'self.port'

           self.server = socketserver.TCPServer((self.host, self.port), MyTCPHandler)

       except Exception as e:

           logger.error('初始化TCPServer失败:%s' % e)

           exit(1)

 

   def start(self):

       # Activate the server; this will keep running until you interrupt the program with Ctrl-C

       self.server.serve_forever()

 

MyTCPClient.py

 

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import socket

import configparser

import time

 

from log import logger

 

 

if __name__ == '__main__':

   if_sock_connected = False

   try:

       config_parser = configparser.ConfigParser()

       config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')

       host = config_parser.get('TCPSERVER', 'host')

       port = int(config_parser.get('TCPSERVER', 'port'))

 

       # Create a socket (SOCK_STREAM means a TCP socket)

       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

 

       # Connect to server and send data

       sock.connect((host, port))

 

       if_sock_connected = True # 标记socket是否已连接

       i = 0

 

       while i < 10000:

           if i == 1000:

               sock.sendall(bytes('bye\n', "utf-8"))

           else:

               sock.sendall(bytes('hello world with tcp\n', "utf-8"))

 

           # Receive data from the server

           received = str(sock.recv(1024), "utf-8")

           logger.info('receive data from server:%s' % received)

           if received == 'bye':

               break

 

           time.sleep(5)

 

           i += 1

   except Exception as e:

       logger.error('程序运行出错:%s' % e)

   finally:

       if if_sock_connected:

           sock.close()

目录
相关文章
|
8月前
|
Kubernetes 大数据 调度
Airflow vs Argo Workflows:分布式任务调度系统的“华山论剑”
本文对比了Apache Airflow与Argo Workflows两大分布式任务调度系统。两者均支持复杂的DAG任务编排、社区支持及任务调度功能,且具备优秀的用户界面。Airflow以Python为核心语言,适合数据科学家使用,拥有丰富的Operator库和云服务集成能力;而Argo Workflows基于Kubernetes设计,支持YAML和Python双语定义工作流,具备轻量化、高性能并发调度的优势,并通过Kubernetes的RBAC机制实现多用户隔离。在大数据和AI场景中,Airflow擅长结合云厂商服务,Argo则更适配Kubernetes生态下的深度集成。
1033 34
|
3月前
|
消息中间件 分布式计算 资源调度
《聊聊分布式》ZooKeeper与ZAB协议:分布式协调的核心引擎
ZooKeeper是一个开源的分布式协调服务,基于ZAB协议实现数据一致性,提供分布式锁、配置管理、领导者选举等核心功能,具有高可用、强一致和简单易用的特点,广泛应用于Kafka、Hadoop等大型分布式系统中。
|
4月前
|
负载均衡 算法 调度
基于遗传算法的新的异构分布式系统任务调度算法研究(Matlab代码实现)
基于遗传算法的新的异构分布式系统任务调度算法研究(Matlab代码实现)
235 11
|
4月前
|
存储 算法 安全
“卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
本文深入解析分布式系统核心机制:数据分片与冗余副本实现扩展与高可用,租约、多数派及Gossip协议保障一致性与容错。探讨节点故障、网络延迟等挑战,揭示CFT/BFT容错原理,剖析规模与性能关系,为构建可靠分布式系统提供理论支撑。
276 2
|
4月前
|
机器学习/深度学习 算法 安全
新型电力系统下多分布式电源接入配电网承载力评估方法研究(Matlab代码实现)
新型电力系统下多分布式电源接入配电网承载力评估方法研究(Matlab代码实现)
176 3
|
6月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
268 1
分布式新闻数据采集系统的同步效率优化实战
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
981 7
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
406 7
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
729 4
构建高可用性GraphRAG系统:分布式部署与容错机制

推荐镜像

更多