zookeeper的实用场景有统一配置,统一命名服务,集群管理,分布式锁,分布式队列。
今天,我的实验场景是集群管理的ha功能.
实验架构如下:

Server[A/B]代码:
此处,我们需要第3方的模块
zkclient.py(https://github.com/phunt/zk-smoketest/blob/master/zkclient.py)
需要做些稍微的修改,因为我这里的ACL采用的是digest,而不是world
导入些模块:
import base64
import hashlib
定义些变量
auth="badboy:test"
user="badboy"
user_auth = "%s:%s" % (user, base64.b64encode(hashlib.new("sha1", auth).digest()))
#ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}
ZOO_DIGEST_ACL_SAFE = {"perms":0x1f ,"scheme":"digest", "id" : user_auth}
再将代码处的ZOO_OPEN_ACL_UNSAFE修改为ZOO_DIGEST_ACL_SAFE
最后就是认证了,我们需要在ZKClient类中的__init__最后一行添加zookeeper.add_auth(self.handle,"digest",auth , None)
这样,我们连接的句柄就是经过认证的了.
注意Server[A/B]代码不同之处是service函数处,所输出的ip内容是不一样的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
import logging
from os.path import basename, join ,dirname
from zkclient import ZKClient, zookeeper
from SimpleXMLRPCServer import SimpleXMLRPCServer
DEBUG = True
BASE_DIR = dirname(__file__)
if DEBUG:
logname = ""
file_mode = ""
else :
logname = join(BASE_DIR, "app.log" )
file_mode = "a"
logging.basicConfig(
level = logging.DEBUG,
format = "[%(asctime)s] %(levelname)-8s %(message)s" ,
datefmt = "%Y-%m-%d %H:%M:%S" ,
filename = logname,
filemode = file_mode,
)
log = logging
class TCZookeeper( object ):
ZK_HOST = "192.168.x.5:2181"
ROOT = "/tc"
WORKERS_PATH = join(ROOT, "nodes" )
MASTERS_NUM = 1
TIMEOUT = 1000
def __init__( self , verbose = True ):
self .VERBOSE = verbose
self .masters = []
self .path = None
self .zk = ZKClient( self .ZK_HOST, timeout = self .TIMEOUT)
self .say( "login ok!" )
self .__init_zk()
self .register()
self .start_service()
def __init_zk( self ):
nodes = ( self .ROOT, self .WORKERS_PATH)
for node in nodes:
if not self .zk.exists(node):
try :
self .zk.create(node, "")
except :
pass
def register( self ):
self .path = self .zk.create( self .WORKERS_PATH + "/node" , "192.168.x.4" , flags = zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
self .paths = self .path
self .path = basename( self .path)
self .say( "register ok! I'm %s" % self .path)
def service( self ):
return "My IP is:192.168.x.4"
def start_service( self ):
server = SimpleXMLRPCServer(( "192.168.x.4" , 8000 ))
self .say( "Listening on port 8000..." )
server.register_function( self .service, "service" )
server.serve_forever()
def say( self , msg):
if self .VERBOSE:
log.info(msg)
if __name__ = = "__main__" :
tczk = TCZookeeper()
|
启动Server[A|B]输出如下:
1
2
3
4
5
6
7
8
9
10
11
|
A机 192.168 .x. 4 :
[root@web02 scripts]
[ 2014 - 12 - 24 14 : 40 : 37 ] INFO login ok!
[ 2014 - 12 - 24 14 : 40 : 37 ] INFO register ok! I'm node0000000034
[ 2014 - 12 - 24 14 : 40 : 37 ] INFO Listening on port 8000. ..
B机 192.168 .x. 5 (zookeeper也在这台机器噢):
[root@web02 scripts]
[ 2014 - 12 - 24 14 : 40 : 37 ] INFO login ok!
[ 2014 - 12 - 24 14 : 40 : 37 ] INFO register ok! I'm node0000000035
[ 2014 - 12 - 24 14 : 40 : 37 ] INFO Listening on port 8000. ..
|
我们再来看看Agent代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
import xmlrpclib
import logging
from os.path import basename, join ,dirname
from zkclient import ZKClient, watchmethod
from SimpleXMLRPCServer import SimpleXMLRPCServer
DEBUG = True
BASE_DIR = dirname(__file__)
if DEBUG:
logname = ""
file_mode = ""
else :
logname = join(BASE_DIR, "app.log" )
file_mode = "a"
logging.basicConfig(
level = logging.DEBUG,
format = "[%(asctime)s] %(levelname)-8s %(message)s" ,
datefmt = "%Y-%m-%d %H:%M:%S" ,
filename = logname,
filemode = file_mode,
)
log = logging
class TCZookeeper( object ):
ZK_HOST = "192.168.x.5:2181"
ROOT = "/tc"
NODES_PATH = join(ROOT, "nodes" )
MASTERS_NUM = 1
TIMEOUT = 1000
def __init__( self , verbose = True ):
self .VERBOSE = verbose
self .masters = []
self .path = None
self .zk = ZKClient( self .ZK_HOST, timeout = self .TIMEOUT)
self .say( "login ok!" )
self .get_master()
def get_master( self ):
@watchmethod
def watcher(event):
self .say( "child changed, try to get master again." )
self .get_master()
children = self .zk.get_children( self .NODES_PATH, watcher)
children.sort()
self .say( "%s's children: %s" % ( self .NODES_PATH, children))
self .masters = children[: self .MASTERS_NUM]
self .path = self .NODES_PATH + "/" + self .masters[ 0 ]
self .serverip = self .zk.get( self .path)
self .say( "MasterIP:%s" % self .serverip[ 0 ])
def rpc( self ):
try :
proxy = xmlrpclib.ServerProxy( "http://%s:8000/" % self .serverip[ 0 ])
return "content: %s" % str (proxy.service())
except :
pass
def start_service( self ):
server = SimpleXMLRPCServer(( "192.168.x.3" , 8000 ))
self .say( "Listening on port 8000..." )
server.register_function( self .rpc, "rpc" )
server.serve_forever()
def say( self , msg):
if self .VERBOSE:
log.info(msg)
if __name__ = = "__main__" :
tczk = TCZookeeper()
tczk.start_service()
|
启动agent输出如下:
1
2
3
4
5
|
[root@web01 scripts]
[ 2014 - 12 - 24 14 : 43 : 29 ] INFO login ok!
[ 2014 - 12 - 24 14 : 43 : 29 ] INFO / tc / nodes 's children: [' node0000000034 ', ' node0000000035']
[ 2014 - 12 - 24 14 : 43 : 29 ] INFO MasterIP: 192.168 .x. 4
[ 2014 - 12 - 24 14 : 43 : 29 ] INFO Listening on port 8000. ..
|
最后,我们看下client
1
2
3
4
5
6
7
8
9
10
11
|
import xmlrpclib
import time
while True :
try :
proxy = xmlrpclib.ServerProxy( "http://192.168.x.3:8000/" )
print proxy.rpc()
except :
pass
time.sleep( 1 )
|
输出结果:
root@saltstack:/scripts# python client.py
content: My IP is:192.168.x.4
content: My IP is:192.168.x.4
content: My IP is:192.168.x.4
此时,我们将其192.168.x.4服务关掉.
很快打印的内容就变成
content: My IP is:192.168.x.5
content: My IP is:192.168.x.5
再看agent输出有,
[2014-12-24 14:47:34] INFO child changed, try to get master again.
[2014-12-24 14:47:34] INFO /tc/nodes's children: ['node0000000035']
[2014-12-24 14:47:34] INFO MasterIP:192.168.x.5
看到没,原先MasterIP:192.168.x.4,现在192.168.x.5
呵呵,HA的功能试验完成了.
本文转自hahazhu0634 51CTO博客,原文链接:http://blog.51cto.com/5ydycm/1595201,如需转载请自行联系原作者