- zk可以使得分布式系统能够协同工作。
- zookeeper名字由来:协调现有系统(hadoop,pig...),zookeeper,动物管理员。
- 不适用:
- 海量数据存储,znode最大存储1MB的数据。
- ZK满足
- 一致性C
- 可用性A
- 不满足,分区容错性P
# Zk api
<!--原生API客户端--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency>
- zk节点可以有数据,也可以没有数据
- 数据在节点被存储为字节数组
- zkApi暴露的方法:
- create /path data
- 创建一个名为/path的znode节点,且包含数据data
- deleta /path
- 删除名为/path的znode节点
- exists /path
- 判断是否存在名为/path的节点
- setData /path data
- 设置名为/path的znode的数据为data
- getData /path
- 获取名为/path的节点的数据
- getChildren /path
- 获取节点名为/path的所有子节点列表
znode的类型,节点类型,mode
- 持久节点persistent
- 只能通过delete语句来删除
- 临时节点ephemeral
- 创建该节点的客户端崩溃或关闭了与zk的连接,这个节点就会被删除,或者被主动delete删除
- 临时节点znode不能有子节点
- znode⼀共有4种类型:持久的(persistent)、临时的(ephemeral)、持久有序的(persistent_sequential)和临时有序的(ephemeral_sequential)
监听与通知
- 客户端向zookeeper注册需要监听的znode,通过对znode设置watch来接收通知。
- 每次设置watch监听,只会被触发一次,所以客户端需要在每次接收通知之后再次对监听的znode注册watch。
版本
- 每个znode都有一个版本号,随着每次数据的变化而自增。乐观锁
- zk容许(集群/2)-1个服务器崩溃,且集群数量最好是奇数个
开始使用zookeeper
- conf/zoo.cfg 配置文件
- bin/zKserver.sh start 启动zk服务器
- bin/zKCli.sh 创建zk客户端会话
- 配置集群(也可在同一台服务器上搭建集群)
tickTime=2000 initLimit=10 syncLimit=5 dataDir=./data clientPort=2181 server.1=127.0.0.1:2222:2223 server.2=127.0.0.1:3333:3334 server.3=127.0.0.1:4444:4445
- 每⼀个server.n项指定了编号为n的ZooKeeper服务器使⽤的地址和端口号。每个server.n项通过冒号分隔为三部分,第⼀部分为服务器n的IP地址或主机名(hostname),第⼆部分和第三部分为TCP端⼜号,分别⽤于仲裁通信和群⾸选举.
- 当启动⼀个服务器时,我们需要知道启动的是哪个服务器。⼀个服务器通过读取data⽬录下⼀个名为myid的⽂件来获取服务器ID信息
使用Zookeeper进行开发
- 客户端连接zk,创建句柄
ZooKeeper( String connectString, int sessionTimeout, Watcher watcher)
- connectString: zk主机名和端口 localhost:2181,localhost:2182
- sessionTimeout: 以毫秒为单位,zk等待客户端通信的最长时间,推荐设置为5-10秒
- watcher: 用于接收通知事件的对象
- 当zk服务端宕机之后,客户端会自动发起重连,直到zk服务端重新启动,客户端再次连接上。
- 客户端可以通过close()关闭与zk服务器的连接,否则需要等到一段时间,直到超过了会话超时时间才会下线
原子操作
Multiop可以原⼦性地执⾏多个ZooKeeper的操作,执⾏过程为原⼦性,即在multiop代码块中的所有操作要不全部成功,要不全部失败
- 使用
multi(Ops)
public class Atomic { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zooKeeper = new ZooKeeper("localhost:2181,localhost:2182,localhost:2183", 2000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent); } }); ArrayList<Op> ops = new ArrayList<>(); ops.add(Op.create("/atomic", "原子节点".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); // 整个操作会失败,因为/atomic节点已经创建了,再次创建会失败 ops.add(Op.create("/atomic", "原子节点".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); ops.add(Op.create("/atomic/node1", "原子节点".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); List<OpResult> opResults = zooKeeper.multi(ops); opResults.forEach(System.out::println); } }