一、zookeeper常用场景总结

1.zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务。zookeeper本身就是一个分布式程序,只要有半数以上存活,zookeeper就能提供服务。

2.基本功能有两个:管理(存储、读取)用户程序提交的数据;并为用户程序提供数据节点监听服务。

3.Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务……

4.常用场景


场景一:状态感知:(图解:当一个接受数据的服务器挂了,则被监听,其他服务器先顶替)
2.png

场景二:主从选举

3.png

场景三:为soldcloud提供统一配置

4.png

二、zookeeper集群节点角色分配原理

群首(leader),追随者(follower),观察者(observer)。

        如何在zookeeper集群中选举出一个leader,zookeeper使用了三种算法,具体使用哪种算法,在配置文件中是可以配置的,对应的配置项是”electionAlg”,其中1对应的是LeaderElection算法,2对应的是AuthFastLeaderElection算法,3对应的是FastLeaderElection算法.默认使用FastLeaderElection算法.


paxos算法的理论基础.

1) 数据恢复阶段

    首先,每个在zookeeper服务器先读取当前保存在磁盘的数据,zookeeper中的每份数据,都有一个对应的id值,这个值是依次递增的,换言之,越新的数据,对应的ID值就越大.

2) 向其他节点发送投票值

    在读取数据完毕之后,每个zookeeper服务器发送自己选举的leader(首次选自己),这个协议中包含了以下几部分的数据:

        a)所选举leader的id(就是配置文件中写好的每个服务器的id) ,在初始阶段,每台服务器的这个值都是自己服务器的id,也就是它们都选举自己为leader.

        b) 服务器最大数据的id,这个值大的服务器,说明存放了更新的数据.

        c)逻辑时钟的值,这个值从0开始递增,每次选举对应一个值,也就是说:  如果在同一次选举中,那么这个值应该是一致的 ;  逻辑时钟值越大,说明这一次选举leader的进程更新.

        d) 本机在当前选举过程中的状态,有以下几种:LOOKING,FOLLOWING,OBSERVING,LEADING,顾名思义不必解释了吧.

3)接受来自其他节点的数据

    每台服务器将自己服务器的以上数据发送到集群中的其他服务器之后,同样的也需要接收来自其他服务器的数据,它将做以下的处理:

(1)如果所接收数据中服务器的状态还是在选举阶段(LOOKING 状态),那么首先判断逻辑时钟值,又分为以下三种情况:

     a) 如果发送过来的逻辑时钟大于目前的逻辑时钟,那么说明这是更新的一次选举,此时需要更新一下本机的逻辑时钟值,同时将之前收集到的来自其他服务器的选举清空,因为这些数据已经不再有效了.然后判断是否需要更新当前自己的选举情况.在这里是根据选举leader id,保存的最大数据id来进行判断的,这两种数据之间对这个选举结果的影响的权重关系是:首先看数据id,数据id大者胜出;其次再判断leader id,leader id大者胜出.然后再将自身最新的选举结果(也就是上面提到的三种数据)广播给其他服务器).

    b) 发送过来数据的逻辑时钟小于本机的逻辑时钟,说明对方在一个相对较早的选举进程中,这里只需要将本机的数据发送过去就是了

    c) 两边的逻辑时钟相同,此时也只是调用totalOrderPredicate函数判断是否需要更新本机的数据,如果更新了再将自己最新的选举结果广播出去就是了.


然后再处理两种情况:

    1)服务器判断是不是已经收集到了所有服务器的选举状态,如果是,那么这台服务器选举的leader就定下来了,然后根据选举结果设置自己的角色(FOLLOWING还是LEADER),然后退出选举过程就是了.

    2)即使没有收集到所有服务器的选举状态,也可以根据该节点上选择的最新的leader是不是得到了超过半数以上服务器的支持,如果是,那么当前线程将被阻塞等待一段时间(这个时间在finalizeWait定义)看看是不是还会收到当前leader的数据更优的leader,如果经过一段时间还没有这个新的leader提出来,那么这台服务器最终的leader就确定了,否则进行下一次选举. 


(2) 如果所接收服务器不在选举状态,也就是在FOLLOWING或者LEADING状态

做以下两个判断:

    a) 如果逻辑时钟相同,将该数据保存到recvset,如果所接收服务器宣称自己是leader,那么将判断是不是有半数以上的服务器选举它,如果是则设置选举状态退出选举过程

    b) 否则这是一条与当前逻辑时钟不符合的消息,那么说明在另一个选举过程中已经有了选举结果,于是将该选举结果加入到outofelection集合中,再根据outofelection来判断是否可以结束选举,如果可以也是保存逻辑时钟,设置选举状态,退出选举过程.

代码如下:


以一个简单的例子来说明整个选举的过程.

假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的.假设这些服务器依序启动,来看看会发生什么.

1) 服务器1启动,此时只有它一台服务器启动了,它发出去的报没有任何响应,所以它的选举状态一直是LOOKING状态

2) 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1,2还是继续保持LOOKING状态.

3) 服务器3启动,根据前面的理论分析,服务器3成为服务器1,2,3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的leader.

4) 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1,2,3,4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了.

5) 服务器5启动,同4一样,当小弟.


三、安装zookeeper

机器部署

    安装到3台虚拟机上

    安装好JDK

上传

 上传用工具。

解压

    su  hadoop(切换到hadoop用户)

    tar -zxvf zookeeper-3.4.5.tar.gz(解压)

重命名

    mv zookeeper-3.4.5 zookeeper(重命名文件夹zookeeper-3.4.5zookeeper

修改环境变量

    1su  root(切换用户到root)

    2vi /etc/profile(修改文件)

    3、添加内容:

export ZOOKEEPER_HOME=/home/hadoop/zookeeper

export PATH=$PATH:$ZOOKEEPER_HOME/bin

    4、重新编译文件:

    source /etc/profile

    5、注意:3zookeeper都需要修改

    6、修改完成后切换回hadoop用户:

    su - hadoop

修改配置文件

    1、用hadoop用户操作

    cd zookeeper/conf

    cp zoo_sample.cfg zoo.cfg

    2、vi zoo.cfg

    3、添加内容:

dataDir=/home/hadoop/zookeeper/data

dataLogDir=/home/hadoop/zookeeper/log

server.1=slave1:2888:3888 (主机名心跳端口、数据端口)

server.2=slave2:2888:3888

server.3=slave3:2888:3888

    4、创建文件夹:

    cd /home/hadoop/zookeeper/

    mkdir -m 755 data

    mkdir -m 755 log

    5、在data文件夹下新建myid文件,myid的文件内容为:

    cd data

    vi myid

    添加内容:

1


将集群下发到其他机器上

    scp -r /home/hadoop/zookeeper hadoop@slave2:/home/hadoop/

    scp -r /home/hadoop/zookeeper hadoop@slave3:/home/hadoop/

修改其他机器的配置文件

    到slave2上:修改myid为:2

    slave3上:修改myid为:3

启动(每台机器

    zkServer.sh start

10 查看集群状态

1、 jps(查看进程)

2、 zkServer.sh status(查看集群状态,主从信息)


四、zookeeper结构和命令

1.1. zookeeper特性

    1、Zookeeper:一个leader,多个follower组成的集群

    2、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的

    3、分布式读写,更新请求转发,由leader实施

    4、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行

    5、数据更新原子性,一次数据更新要么成功,要么失败

    6、实时性,在一定时间范围内,client能读到最新数据

1.2. zookeeper数据结构

    1、层次化的目录结构,命名符合常规文件系统规范

    2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识

    3、节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)

    4、客户端应用可以在节点上设置监视器

1.3. 数据结构的图

    图片1.png

    任何一个节点内部都可以保存数据。

1.4. 节点类型

    1、Znode有两种类型:

        短暂(ephemeral)(断开连接自己删除)

        持久(persistent)(断开连接不删除)

    2Znode四种形式的目录节点(默认是persistent 

        PERSISTENT

        PERSISTENT_SEQUENTIAL(持久序列/test0000000019 

        EPHEMERAL

        EPHEMERAL_SEQUENTIAL

    3、创建znode时设置顺序标识znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护,如创建的时候是aa,完成后是aa0000001

    4、在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

1.5. znode数据结构类型监听

    当一个客户端修改了zookeeper集群理解一个节点的数据之后,其他客户端是可以感知更新的通过查询。但是当节点数量很小的时候,数据一致性很快,但是当数据节点有几十台,会有明显的延迟

    当get path[watch],有一个监听功能:当数据发生变化,在客户端通过watch可以感知到。当有了这个功能,客户端程序,可以通过写逻辑代码进行监听做出相应处理。

    注:获取数据的时候,注册监听,只能生效一次。还需注意监听也有类型,包括阶段数据变化(get path [watch])、节点类型变化(ls path [watch])。



五、zookeeper的JavaAPI使用


zookeeper客户端内的守护线程及监听机制

zookeeper客户端内的守护线程及监听机制.png


  1. main线程建立zkclient对象 ,同时有connect和listener两个线程;

  2. connect线程发出getchildren请求,会一起发给zookeeper集群以下信息:IP,path,port

  3. zookeeper集群发现/path下的数据发生变化,根据IPport就会找见客户端,进行socket通信,访问客户端的listener线程

  4. listener线程就会再调用zkclient对象的process方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public  class  SimpleZkClient {
 
     private  static  final  String connectString =  "mini1:2181,mini2:2181,mini3:2181" ;
     private  static  final  int  sessionTimeout =  2000 ;
 
     ZooKeeper zkClient =  null ;
 
     @Before
     public  void  init()  throws  Exception {
         zkClient =  new  ZooKeeper(connectString, sessionTimeout,  new  Watcher() {
             @Override
             public  void  process(WatchedEvent event) {
                 // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
                 System.out.println(event.getType() +  "---"  + event.getPath());
                 try  {
                     zkClient.getChildren( "/" true );
                 catch  (Exception e) {
                 }
             }
         });
 
     }
 
     /**
      * 数据的增删改查
     
      * @throws InterruptedException
      * @throws KeeperException
      */
 
     // 创建数据节点到zk中
     public  void  testCreate()  throws  KeeperException, InterruptedException {
         // 参数1:要创建的节点的路径 参数2:节点大数据 参数3:节点的权限 参数4:节点的类型
         String nodeCreated = zkClient.create( "/eclipse" "hellozk" .getBytes(), Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT);
         // 上传的数据可以是任何类型,但都要转成byte[]
     }
 
     // 判断znode是否存在
     @Test
     public  void  testExist()  throws  Exception {
         Stat stat = zkClient.exists( "/eclipse" false );
         System.out.println(stat ==  null  "not exist"  "exist" );
 
     }
 
     // 获取子节点
     @Test
     public  void  getChildren()  throws  Exception {
         List<String> children = zkClient.getChildren( "/" true );
         for  (String child : children) {
             System.out.println(child);
         }
         Thread.sleep(Long.MAX_VALUE);
     }
 
     // 获取znode的数据
     @Test
     public  void  getData()  throws  Exception {
 
         byte [] data = zkClient.getData( "/eclipse" false null );
         System.out.println( new  String(data));
 
     }
 
     // 删除znode
     @Test
     public  void  deleteZnode()  throws  Exception {
 
         // 参数2:指定要删除的版本,-1表示删除所有版本
         zkClient.delete( "/eclipse" , - 1 );
 
     }
 
     // 删除znode
     @Test
     public  void  setData()  throws  Exception {
 
         zkClient.setData( "/app1" "imissyou angelababy" .getBytes(), - 1 );
 
         byte [] data = zkClient.getData( "/app1" false null );
         System.out.println( new  String(data));
 
     }
 
}

六、分布式应用系统服务器上下线动态感知

服务器动态上下线程序的工作机制.png

  1. 服务端启动时去zookeeper集群注册信息,建立临时节点

  2. 客户端启动时就去getchildren,获取到当前在线服务器列表(并且注册监听)

  3. 当收到服务节点上下线事件通知的时候,就会再去执行process(重新再去获取服务器列表,再注册监听)


客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public  class  DistributedClient {
 
     private  static  final  String connectString =  "mini1:2181,mini2:2181,mini3:2181" ;
     private  static  final  int  sessionTimeout =  2000 ;
     private  static  final  String parentNode =  "/servers" ;
     // 注意:加volatile的意义何在?
     private  volatile  List<String> serverList;
     private  ZooKeeper zk =  null ;
 
     /**
      * 创建到zk的客户端连接
     
      * @throws Exception
      */
     public  void  getConnect()  throws  Exception {
 
         zk =  new  ZooKeeper(connectString, sessionTimeout,  new  Watcher() {
             @Override
             public  void  process(WatchedEvent event) {
                 // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
                 try  {
                     //重新更新服务器列表,并且注册了监听
                     getServerList();
 
                 catch  (Exception e) {
                 }
             }
         });
     }
 
     /**
      * 获取服务器信息列表
     
      * @throws Exception
      */
     public  void  getServerList()  throws  Exception {
 
         // 获取服务器子节点信息,并且对父节点进行监听
         List<String> children = zk.getChildren(parentNode,  true );
 
         // 先创建一个局部的list来存服务器信息
         List<String> servers =  new  ArrayList<String>();
         for  (String child : children) {
             // child只是子节点的节点名
             byte [] data = zk.getData(parentNode +  "/"  + child,  false null );
             servers.add( new  String(data));
         }
         // 把servers赋值给成员变量serverList,已提供给各业务线程使用
         serverList = servers;
         
         //打印服务器列表
         System.out.println(serverList);
     }
 
     /**
      * 业务功能
     
      * @throws InterruptedException
      */
     public  void  handleBussiness()  throws  InterruptedException {
         System.out.println( "client start working....." );
         Thread.sleep(Long.MAX_VALUE);
     }
     
     public  static  void  main(String[] args)  throws  Exception {
 
         // 获取zk连接
         DistributedClient client =  new  DistributedClient();
         client.getConnect();
         // 获取servers的子节点信息(并监听),从中获取服务器信息列表
         client.getServerList();
 
         // 业务线程启动
         client.handleBussiness();
     }
}

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public  class  DistributedServer {
     private  static  final  String connectString =  "mini1:2181,mini2:2181,mini3:2181" ;
     private  static  final  int  sessionTimeout =  2000 ;
     private  static  final  String parentNode =  "/servers" ;
 
     private  ZooKeeper zk =  null ;
 
     /**
      * 创建到zk的客户端连接
     
      * @throws Exception
      */
     public  void  getConnect()  throws  Exception {
 
         zk =  new  ZooKeeper(connectString, sessionTimeout,  new  Watcher() {
             @Override
             public  void  process(WatchedEvent event) {
                 // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
                 System.out.println(event.getType() +  "---"  + event.getPath());
                 try  {
                     zk.getChildren( "/" true );
                 catch  (Exception e) {
                 }
             }
         });
 
     }
 
     /**
      * 向zk集群注册服务器信息
     
      * @param hostname
      * @throws Exception
      */
     public  void  registerServer(String hostname)  throws  Exception {
 
         String create = zk.create(parentNode +  "/server" , hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
         System.out.println(hostname +  "is online.."  + create);
 
     }
 
     /**
      * 业务功能
     
      * @throws InterruptedException
      */
     public  void  handleBussiness(String hostname)  throws  InterruptedException {
         System.out.println(hostname +  "start working....." );
         Thread.sleep(Long.MAX_VALUE);
     }
 
     public  static  void  main(String[] args)  throws  Exception {
 
         // 获取zk连接
         DistributedServer server =  new  DistributedServer();
         server.getConnect();
 
         // 利用zk连接注册服务器信息
         server.registerServer(args[ 0 ]);
 
         // 启动业务功能
         server.handleBussiness(args[ 0 ]);
 
     }
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public  class  Test {
     public  static  void  main(String[] args) {
         System.out.println( "主线程开始了" );
         Thread thread =  new  Thread( new  Runnable() {
             @Override
             public  void  run() {
                 System.out.println( "线程开始了" );
                 while ( true ){
                     
                 }
             }
         });
         thread.setDaemon( true );
         thread.start();
     }
}


补:DistributedClientLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public  class  DistributedClientLock {
     // 会话超时
     private  static  final  int  SESSION_TIMEOUT =  2000 ;
     // zookeeper集群地址
     private  String hosts =  "mini1:2181,mini2:2181,mini3:2181" ;
     private  String groupNode =  "locks" ;
     private  String subNode =  "sub" ;
     private  boolean  haveLock =  false ;
 
     private  ZooKeeper zk;
     // 记录自己创建的子节点路径
     private  volatile  String thisPath;
 
     /**
      * 连接zookeeper
      */
     public  void  connectZookeeper()  throws  Exception {
         zk =  new  ZooKeeper(hosts, SESSION_TIMEOUT,  new  Watcher() {
             public  void  process(WatchedEvent event) {
                 try  {
 
                     // 判断事件类型,此处只处理子节点变化事件
                     if  (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals( "/"  + groupNode)) {
                         //获取子节点,并对父节点进行监听
                         List<String> childrenNodes = zk.getChildren( "/"  + groupNode,  true );
                         String thisNode = thisPath.substring(( "/"  + groupNode +  "/" ).length());
                         // 去比较是否自己是最小id
                         Collections.sort(childrenNodes);
                         if  (childrenNodes.indexOf(thisNode) ==  0 ) {
                             //访问共享资源处理业务,并且在处理完成之后删除锁
                             doSomething();
                             
                             //重新注册一把新的锁
                             thisPath = zk.create( "/"  + groupNode +  "/"  + subNode,  null , Ids.OPEN_ACL_UNSAFE,
                                     CreateMode.EPHEMERAL_SEQUENTIAL);
                         }
                     }
                 catch  (Exception e) {
                     e.printStackTrace();
                 }
             }
         });
 
         // 1、程序一进来就先注册一把锁到zk上
         thisPath = zk.create( "/"  + groupNode +  "/"  + subNode,  null , Ids.OPEN_ACL_UNSAFE,
                 CreateMode.EPHEMERAL_SEQUENTIAL);
 
         // wait一小会,便于观察
         Thread.sleep( new  Random().nextInt( 1000 ));
 
         // 从zk的锁父目录下,获取所有子节点,并且注册对父节点的监听
         List<String> childrenNodes = zk.getChildren( "/"  + groupNode,  true );
 
         //如果争抢资源的程序就只有自己,则可以直接去访问共享资源 
         if  (childrenNodes.size() ==  1 ) {
             doSomething();
             thisPath = zk.create( "/"  + groupNode +  "/"  + subNode,  null , Ids.OPEN_ACL_UNSAFE,
                     CreateMode.EPHEMERAL_SEQUENTIAL);
         }
     }
 
     /**
      * 处理业务逻辑,并且在最后释放锁
      */
     private  void  doSomething()  throws  Exception {
         try  {
             System.out.println( "gain lock: "  + thisPath);
             Thread.sleep( 2000 );
             // do something
         finally  {
             System.out.println( "finished: "  + thisPath);
             // 锟斤拷thisPath删锟斤拷, 锟斤拷锟斤拷thisPath锟斤拷client锟斤拷锟斤拷锟酵ㄖ�
             // 锟洁当锟斤拷锟酵凤拷锟斤拷
             zk.delete( this .thisPath, - 1 );
         }
     }
 
     public  static  void  main(String[] args)  throws  Exception {
         DistributedClientLock dl =  new  DistributedClientLock();
         dl.connectZookeeper();
         Thread.sleep(Long.MAX_VALUE);
     }
}