Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现 2

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现


appClient.py

 

#!/usr/bin/env python

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import time

from log import logger

 

from kazoo.client import  KazooClient

from kazoo.client import KazooState

 

def my_listener(state):

   if state == KazooState.LOST:

       logger.info('LOST')

 

       # Register somewhere that the session was lost

   elif state == KazooState.SUSPENDED:

       logger.info('SUSPENDED')

       # Handle being disconnected from Zookeeper

   else:

       logger.info('CONNECTED')

       # Handle being connected/reconnected to Zookeeper

 

def my_event_listener(event):

   logger.info(event)

 

 

zk_client = KazooClient(hosts='10.118.52.26:2181')

zk_client.add_listener(my_listener)

zk_client.start()

 

node_path = '/rootNode'

sub_node = 'loaderAgent102027165'

children = zk_client.get_children(node_path, watch=my_event_listener)

logger.info('there are %s children with names %s' % (len(children), children))

 

 

@zk_client.ChildrenWatch(node_path)

def watch_children(children):

   logger.info("Children are now: %s" % children)

 

 

@zk_client.DataWatch("%s/%s" % (node_path, sub_node))

def watch_node(data, state):

   """监视节点数据是否变化"""

   if state:

       logger.info('Version:%s, data:%s' % (state.version, data))

 

i = 0

while i < 1000:

   time.sleep(5)

   children = zk_client.get_children(node_path, watch=my_event_listener)

   logger.info('there are %s children with names %s' % (len(children), children))

   i += 1

 

zk_client.stop()

zk_client.close()

 

 

 

 

 

loadAgent.py

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import time

import threading

import configparser

import json

import subprocess

 

from kazoo.client import  KazooClient

from kazoo.client import KazooState

from log import logger

 

from myTCPServer import MyTCPServer

 

# 全局变量

zk_conn_stat = 0 # zookeeper连接状态 1-LOST   2-SUSPENDED 3-CONNECTED/RECONNECTED

registry_status = 0 # 服务器节点在zookeeper的注册状态  0-未注册、正在注册, 1-已注册

 

def restart_zk_client():

   '''重启zookeeper会话'''

 

   global zk_client

   global zk_conn_stat

   try:

       zk_client.restart()

       registry_zookeeper()

   except Exception as e:

       logger.error('重启zookeeper客户端异常:%s' % e)

 

 

def zk_conn_listener(state):

   '''zookeeper连接状态监听器'''

 

   global zk_conn_stat

   global registry_status

   if state == KazooState.LOST:

       logger.warn('zookeeper connection lost')

       zk_conn_stat = 1

       registry_status = 0 # 重置是否完成注册

       # Register somewhere that the session was lost

 

       thread = threading.Thread(target=restart_zk_client)

       thread.start()

 

   elif state == KazooState.SUSPENDED:

       logger.warn('zookeeper connection dicconnected')

       zk_conn_stat = 2

       # Handle being disconnected from Zookeeper

   else:

       zk_conn_stat = 3

       logger.info('zookeeper connection cconnected/reconnected')

       # Handle being connected/reconnected to Zookeeper

 

def registry_zookeeper():

   '''注册节点信息到zookeeper'''

 

   global node_parent_path

   global host

   global port

   global zk_client

   global zk_conn_stat

   global registry_status

 

   try:

       while zk_conn_stat != 3: # 如果zookeeper客户端没连上zookeeper,则先不让注册

           continue

 

       logger.info('正在注册负载机到zookeeper...')

       zk_client.ensure_path(node_parent_path)

 

       loader_agent_info = '{"host":"%s", "port":%s, "status":"idle"}' % (host, port)

 

       if not zk_client.exists('%s/loaderAgent%s' % (node_parent_path, host.replace('.', ''))):

           zk_client.create('%s/loaderAgent%s' % (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'), ephemeral=True, sequence=False)

 

       # children = zk_client.get_children(node_parent_path)

       # logger.info('there are %s children with names: %s' % (len(children), children))

       # for child in children:

       #     logger.info(child)

       #     data, stat = zk_client.get('%s/%s' % (node_parent_path, child))

       #     logger.info(data)

       registry_status = 1 # 完成注册

       logger.info('注册负载机到zookeeper成功')

       return True

   except Exception as e:

       logger.error('注册负载机到zookeeper失败:%s' % e)

       return False

 

 

def start_tcpserver(tcpserver):

   '''启动tcp服务器'''

 

   tcpserver.start()

 

 

def get_server_status(proc_name):

   '''通过给定进程名称获取服务器状态'''

 

   with subprocess.Popen('ps -e | grep "%s"' % proc_name, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True) as proc:

       try:

           outs, errs = proc.communicate(timeout=30)

           outs = outs.strip()

           if outs.find(proc_name) != -1:

               # logger.info('获取负载机状态成功 %s' % outs)

               server_status = 'busy'

           elif outs == '':

               # logger.info('获取负载机状态成功')

               server_status = 'idle'

           else:

               logger.error('获取负载机状态失败:%s' % errs)

               server_status = 'unknow'

       except Exception as e:

           proc.kill()

           logger.error('获取负载机状态失败:%s' % e)

           server_status = 'unknow'

   return server_status

 

 

def update_server_status(interval, proc_name):

   '''定时检测并更新服务器状态:根据进程名称是否存在来判断服务器状态,如果存在则表示服务器被占用,标记服务器状态为busy,否则标记服务器状态为 idle

   如果根据进程名,检查进程失败,则标记服务器状态为unknow'''

 

   global node_parent_path

   global host

   global port

 

   while True:

       second_for_localtime1 = time.mktime(time.localtime()) # UTC时间(秒)

 

       if zk_conn_stat != 3: # 如果zookeeper客户端还没连上zookeeper,则不让进行后续操作

           continue

 

       if registry_status != 1: # 如果zookeeper客户端已连上zookeeper,但是还没注册节点到zookeeper,则不让进行后续操作

           continue

 

       server_status = get_server_status(proc_name)

       loader_agent_info = '{"host":"%s", "port":%s, "status":"%s"}' % (host, port, server_status)

       '''

       这里为啥要加这个判断:zookeeper删除临时节点存在延迟,如果zookeeper客户端主动关闭后快速重启并注册节点信息 这个过程耗时比较短,可能注册完节点信息时,zookeeper

       还没来得及删除重启之前创建的临时节点,而本次创建的临时节点路径和重启前的一模一样,这样导致的结果是,zookeeper接下来的删除操作,会把重启后注册的节点也删除

      '''

       if zk_client.exists('%s/loaderAgent%s' % (node_parent_path, host.replace('.', ''))):

           zk_client.set('%s/loaderAgent%s' % (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'))

       else:

           registry_zookeeper()

 

       second_for_localtime2 = time.mktime(time.localtime()) # UTC时间(秒)

       time_difference = second_for_localtime2 - second_for_localtime1

       if time_difference < interval:

           time.sleep(interval - time_difference)

 

 

if __name__ == '__main__':

   logger.info('正在启动代理...')

 

   try:

       logger.info('正在读取zookeeper配置...')

       config_parser = configparser.ConfigParser()

       config_parser.read('./conf/zookeeper.conf', encoding='utf-8-sig')

       zk_hosts = config_parser.get('ZOOKEEPER', 'hosts').replace(',', ',').strip()

       node_parent_path = config_parser.get('ZOOKEEPER', 'nodeParentPath').replace(',', ',').strip()

 

       logger.info('正在构建并启动zookeeper客户端...')

       zk_client = KazooClient(hosts=zk_hosts)

       zk_client.add_listener(zk_conn_listener)

       zk_client.start()

   except Exception as e:

       logger.error('初始化zookeeper客户端失败: %s' % e)

       exit(1)

 

   try:

       config_parser.clear()

       config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')

       host = config_parser.get('TCPSERVER', 'host')

       port = int(config_parser.get('TCPSERVER', 'port'))

       tcp_server  = MyTCPServer(host, port)

       thread = threading.Thread(target=start_tcpserver, args=(tcp_server,))

       thread.start()

   except Exception as e:

       logger.error('TCPServer启动失败:%s,请检查配置/conf/tcpserver.conf是否正确' % e)

       exit(1)

 

 

   try:

       # 注册到zookeeper

       registry_zookeeper()

 

       config_parser.clear()

       config_parser.read('./conf/agent.conf', encoding='utf-8-sig')

       interval = int(config_parser.get('AGENT', 'interval'))

       proc = config_parser.get('AGENT', 'proc').strip()

 

       # 定时更新服务器节点繁忙状态

       update_server_status(interval, proc)

   except Exception as e:

       logger.error('zk_client运行失败:%s,请检查配置/conf/agent.conf是否正确' % e)

       exit(1)

 

 

 

运行效果

 

 

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
1天前
|
机器学习/深度学习 人工智能 算法
海洋生物识别系统+图像识别+Python+人工智能课设+深度学习+卷积神经网络算法+TensorFlow
海洋生物识别系统。以Python作为主要编程语言,通过TensorFlow搭建ResNet50卷积神经网络算法,通过对22种常见的海洋生物('蛤蜊', '珊瑚', '螃蟹', '海豚', '鳗鱼', '水母', '龙虾', '海蛞蝓', '章鱼', '水獭', '企鹅', '河豚', '魔鬼鱼', '海胆', '海马', '海豹', '鲨鱼', '虾', '鱿鱼', '海星', '海龟', '鲸鱼')数据集进行训练,得到一个识别精度较高的模型文件,然后使用Django开发一个Web网页平台操作界面,实现用户上传一张海洋生物图片识别其名称。
30 7
海洋生物识别系统+图像识别+Python+人工智能课设+深度学习+卷积神经网络算法+TensorFlow
|
2天前
|
机器学习/深度学习 人工智能 算法
【昆虫识别系统】图像识别Python+卷积神经网络算法+人工智能+深度学习+机器学习+TensorFlow+ResNet50
昆虫识别系统,使用Python作为主要开发语言。通过TensorFlow搭建ResNet50卷积神经网络算法(CNN)模型。通过对10种常见的昆虫图片数据集('蜜蜂', '甲虫', '蝴蝶', '蝉', '蜻蜓', '蚱蜢', '蛾', '蝎子', '蜗牛', '蜘蛛')进行训练,得到一个识别精度较高的H5格式模型文件,然后使用Django搭建Web网页端可视化操作界面,实现用户上传一张昆虫图片识别其名称。
35 7
【昆虫识别系统】图像识别Python+卷积神经网络算法+人工智能+深度学习+机器学习+TensorFlow+ResNet50
|
2天前
|
机器学习/深度学习 人工智能 算法
【球类识别系统】图像识别Python+卷积神经网络算法+人工智能+深度学习+TensorFlow
球类识别系统,本系统使用Python作为主要编程语言,基于TensorFlow搭建ResNet50卷积神经网络算法模型,通过收集 '美式足球', '棒球', '篮球', '台球', '保龄球', '板球', '足球', '高尔夫球', '曲棍球', '冰球', '橄榄球', '羽毛球', '乒乓球', '网球', '排球'等15种常见的球类图像作为数据集,然后进行训练,最终得到一个识别精度较高的模型文件。再使用Django开发Web网页端可视化界面平台,实现用户上传一张球类图片识别其名称。
22 7
【球类识别系统】图像识别Python+卷积神经网络算法+人工智能+深度学习+TensorFlow
|
2天前
|
人工智能 数据挖掘 大数据
538个代码示例!麻省理工教授的Python程序设计+人工智能案例实践
Python简单易学,且提供了丰富的第三方库,可以用较少的代码完成较多的工作,使开发者能够专注于如何解决问题而只花较少的时间去考虑如何编程。 此外,Python还具有免费开源、跨平台、面向对象、胶水语言等优点,在系统编程、图形界面开发、科学计算、Web开发、数据分析、人工智能等方面有广泛应用。 尤其是在数据分析和人工智能方面,Python已成为最受开发者欢迎的编程语言之一,不仅大量计算机专业人员选择使用Python进行快速开发,许多非计算机专业人员也纷纷选择Python语言来解决专业问题。 由于Python应用广泛,关于Python的参考书目前已经有很多,但将Python编程与数据分析、人工智
|
2天前
|
JSON API 数据库
Python使用Quart作为web服务器的代码实现
Quart 是一个异步的 Web 框架,它使用 ASGI 接口(Asynchronous Server Gateway Interface)而不是传统的 WSGI(Web Server Gateway Interface)。这使得 Quart 特别适合用于构建需要处理大量并发连接的高性能 Web 应用程序。与 Flask 类似,Quart 也非常灵活,可以轻松地构建 RESTful API、WebSockets、HTTP/2 服务器推送等。
|
18小时前
|
数据采集 自然语言处理 数据可视化
拿来及用的Python词云图代码 | wordcloud生成词云详解
词云也叫文字云,是一种可视化的结果呈现,常用在爬虫数据分析中,原理就是统计文本中高频出现的词,过滤掉某些干扰词,将结果生成一张图片,直观的获取数据的重点信息。今天,我们就来学习一下Python生成词云的常用库wordcloud。
|
1天前
|
数据采集 人工智能 JavaScript
如何使用Python执行js代码
如何使用Python执行js代码
|
1天前
|
Web App开发 数据采集 JavaScript
python执行js代码几个方法
python执行js代码几个方法
|
3天前
|
Shell 虚拟化
分布式系统详解--框架(Zookeeper-基本shell命令)
分布式系统详解--框架(Zookeeper-基本shell命令)
9 1
|
4天前
|
SQL Oracle 关系型数据库
Python连接数据库进行数据查询的操作代码
mysql数据库(mariadb) 连接数据库 首先,你需要使用MySQLdb.connect()函数建立与MySQL数据库的连接。你需要提供数据库服务器的地址(host),用户名(user),密码(passwd),以及你想要操作的数据库名称(db)。 创建Cursor对象 一旦建立了数据库连接,你可以使用连接对象的cursor()方法来创建一个cursor对象。这个方法返回一个cursor实例,你可以使用这个实例来执行SQL查询和命令。