Python 基于python操纵zookeeper介绍

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: Python 基于python操纵zookeeper介绍

基于python操纵zookeeper介绍

 

by:授客  QQ:1033553122

测试环境

Win7 64位

 

Python 3.3.4

 

kazoo-2.6.1-py2.py3-none-any.whl(windows)

kazoo-2.6.1.tar.gz (linux)

https://pypi.org/project/kazoo/#files

 

zookeeper-3.4.13.tar.gz

下载地址:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

 

 

 

代码实践

kazooStudy.py

#!/usr/bin/env python 3.4.0

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

 

import threading

import time

 

from kazoo.client import  KazooClient

from kazoo.client import KazooState

from kazoo.retry import KazooRetry

 

 

def restart_zk_client():

   '''重启zookeeper会话'''

   global zk_client

   global zk_conn_stat

   try:

       zk_client.restart()

   except Exception as e:

       print('重启zookeeper客户端异常:%s' % e)

 

zk_conn_stat = 0 # zookeeper连接状态 1-LOST   2-SUSPENDED 3-CONNECTED/RECONNECTED

def zk_conn_listener(state):

   '''zookeeper连接状态监听器'''

 

   global  zk_conn_stat

   if state == KazooState.LOST:

       print('zookeeper connection lost')

       zk_conn_stat = 1

       # Register somewhere that the session was lost

 

       thread = threading.Thread(target=restart_zk_client)

       thread.start()

 

   elif state == KazooState.SUSPENDED:

       print('zookeeper connection dicconnected')

       zk_conn_stat = 2

       # Handle being disconnected from Zookeeper

   else:

       zk_conn_stat = 3

       print('zookeeper connection cconnected/reconnected')

       # Handle being connected/reconnected to Zookeeper

 

# 监视器

# 当节点有变化、节点被删除时,将以多线程的方式调用以参数形式传递给get()、exists()的监视函数,监视函数将会接收到一个WatchedEvent实例

def event_listener(event):

   print(event)

 

 

 

if __name__ == '__main__':

   try:

       # 建立连接

       zk_client = KazooClient(hosts='127.0.0.1:2181')

       zk_client.add_listener(zk_conn_listener) # 添加监听器,监听连接状态

       zk_client.start() # 初始化到zk的连接,可以设置超时时间 zk_client.start(timeout=15) 默认15秒

 

       print('zk_client state:', zk_client.state) # 查看链接状态

 

       # 创建节点

       # ensure_path() 递归创建path中不存在的节点,但是不能为节点设置数据,仅ACL.

       zk_client.ensure_path('/node1')

 

       # 创建永久节点

       # create创建节点的同时,可为节点设置数据,要求path路径必须存在

       if not zk_client.exists('/node1/subNode1'):

           zk_client.create('/node1/subNode1', b'sub node1')

 

       # 创建临时节点

       # 注意:会话丢失、重启会话会导致zookeeper删除重启会话前创建的临时节点

       if not zk_client.exists('/node1/subNode2'):

           zk_client.create('/node1/subNode2', b'sub node2', ephemeral=True)

 

       # 创建有序临时节点

       zk_client.create('/node1/subNode', b'sub nodexxxx', ephemeral=True, sequence=True)

       # 读取数据

       # 判断节点是否存在

       if zk_client.exists('/node1'): # 如果返回值为None则表示不存在给定节点

           print('存在节点node1,节点路径/node1')

 

           # 获取节点相关数据

           data, stat = zk_client.get('/node1')

           if stat:

               print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

 

           # 获取给定节点的子节点

           children = zk_client.get_children('/node1')

           print('node1子节点 有 %s 子节点,节点名称为: %s' % (len(children), children))

 

           print('/ 子节点', zk_client.get_children('/'))

 

       # 更新节点

       # 更新节点数据

       zk_client.set("/node1/subNode2", b"some new data")

 

       # 删除节点 recursive参数可选,递归删除节点数据

       zk_client.delete("/node1", recursive=True)

 

 

       # 重试命令

       try:

           result = zk_client.retry(zk_client.get, "/node1/subNode3")

           print(result)

 

           # 自定义重试

           # max_tries 出错最大重试次数, ignore_expire False-重试的时候忽略会话过期,否则不忽略

           kr = KazooRetry(max_tries=3, ignore_expire=False)

           result = kr(zk_client.get, "/node1/subNode3")

       except Exception as e:

           print('/node1/subNode3 不存在,所以会运行出错')

 

 

       # 释放客户端占用资源,移除连接

       zk_client.stop()

 

       #  zk_client.stop() 会导致zk_client连接状态变成 LOST,进而触发线程调用函数 restart_zk_client,

       # 该函数未执行完成的情况下,如果马上执行类似get,create等函数,会导致运行出错

       #

 

       while zk_conn_stat != 3:

           continue

       else:

           i = 0

           while i < 3000:

               if i % 200 == 0:

                   time.sleep(2)

                   print('创建新节点')

                   zk_client.ensure_path('/node1')

                   zk_client.ensure_path('/node1/subNode2')

                   zk_client.create('/node1/subNode', b'sub nodexxxx', ephemeral=True, sequence=True)

                   zk_client.set('/node1/subNode2', b'new data')

               i += 1

 

 

 

       # 关闭客户端前必须先调用stop,否则会报错

       zk_client.stop()

 

       # 关闭客户端

       zk_client.close()

   except Exception as e:

       print('运行出错:%s' % e)

 

 

monitor.py

#!/usr/bin/env python

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

import time

 

from kazoo.client import  KazooClient

from kazoo.client import KazooState

 

zk = KazooClient(hosts='10.118.52.26:2181')

zk.start()

 

@zk.add_listener

def my_listener(state):

   if state == KazooState.LOST:

       print('LOST')

       # Register somewhere that the session was lost

   elif state == KazooState.SUSPENDED:

       print('SUSPENDED')

       # Handle being disconnected from Zookeeper

   else:

       pass

       print('CONNECTED')

       # Handle being connected/reconnected to Zookeeper

 

# 监视器

# 当节点有变化、节点被删除时,将以多线程的方式调用以参数形式传递给get()、exists()的监视函数,监视函数将会接收到一个WatchedEvent实例

def event_listener(event):

   print(event)

 

children = zk.get_children('/node1',watch=event_listener)

print('node1 has %s children with names %s' % (len(children), children))

 

# 更高级监视api

# 监视子节点的编号

@zk.ChildrenWatch('/node1')

def watch_children(children):

   print("Children are now: %s" % children)

 

 

# 监视节点数据变更

@zk.DataWatch("/node1/subNode2") #

def watch_node(data, state):

   """监视节点数据是否变化"""

   if state:

       print("Version:", state.version, "data:", data)

 

# 空转

i = 0

while i< 100:

   # children = zk.get_children('/node1',watch=event_listener)

   # print('node1 has %s children with names %s' % (len(children), children))

   time.sleep(1)

 

zk.stop()

zk.close()

 

 

 

关于kazooClient连接状态说明

LOST

CONNECTED

SUSPENDED

 

KazooClient客户端实例刚创建时,处于LOST状态,同zookeeper建立连接后,转为CONNECTED 。如果连接出问题、切换到不同的zookeeper集群几点,转为SUSPENDED状态,当你知道暂时不能执行命令,如果zookeeper节点不再是集群的一部分,连接将丢失,也会导致 SUSPENDED状态

 

客户端再次同zookeeper建立连接,如果会话不存在,客户端连接状态将转为LOST,如果会话没有过期,可用则转为CONNECTED

 

 

运行效果

 

参考链接:

https://kazoo.readthedocs.io/en/latest/basic_usage.html

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
3月前
|
XML JSON 定位技术
在Python中操纵json数据的最佳方式
在Python中操纵json数据的最佳方式
|
6月前
|
安全 API 数据安全/隐私保护
使用Python操纵Word自动编写离职报告
使用Python操纵Word自动编写离职报告
45 0
|
JSON 网络协议 Shell
Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现 2
Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现
259 0
|
网络协议 Linux 测试技术
Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现 1
Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现
190 0
|
存储 监控 NoSQL
Python 基于python操纵redis入门介绍
Python 基于python操纵redis入门介绍
148 0
|
Python
Python:kazoo模块与Zookeeper交互
Python:kazoo模块与Zookeeper交互
109 0
|
Python
Python:kazoo模块与Zookeeper交互
Python:kazoo模块与Zookeeper交互
114 0
|
Python
Python:kazoo模块与Zookeeper交互
Python:kazoo模块与Zookeeper交互
170 0
|
2天前
|
Python
不容错过!Python中图的精妙表示与高效遍历策略,提升你的编程艺术感
本文介绍了Python中图的表示方法及遍历策略。图可通过邻接表或邻接矩阵表示,前者节省空间适合稀疏图,后者便于检查连接但占用更多空间。文章详细展示了邻接表和邻接矩阵的实现,并讲解了深度优先搜索(DFS)和广度优先搜索(BFS)的遍历方法,帮助读者掌握图的基本操作和应用技巧。
13 4
|
2天前
|
设计模式 程序员 数据处理
编程之旅:探索Python中的装饰器
【10月更文挑战第34天】在编程的海洋中,Python这艘航船以其简洁优雅著称。其中,装饰器作为一项高级特性,如同船上的风帆,让代码更加灵活和强大。本文将带你领略装饰器的奥秘,从基础概念到实际应用,一起感受编程之美。
下一篇
无影云桌面