零、创建 SpringBoot 项目
Lombok
SpringWeb
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.2.2.RELEASE</version>
</plugin>
</plugins>
</build>
一、Java 客户端(MAVEN 依赖)
(1) ZooKeeper 自带的客户端
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
(2) Apache 的开源客户端 Curator
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
(3) Apache 开源的 zkclient(旧)
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
二、Java 客户端 API
(1) 创建 Java 与 ZooKeeper 的连接会话
创建 ZooKeeper 对象,放入 IoC 容器即可。
① Java 代码
@Configuration
public class ZooKeeperClient {
// 要连接的 ZooKeeper 所在的服务器地址和端口号, 多个服务器地址用【逗号】分隔
private static final String CONNECT_STRING = "192.168.80.128:2888:3888,192.168.80.128:2888:3888,192.168.80.128:2889:3889";
// 会话超时时间 (单位: 毫秒)
private static final int SESSION_TIMEOUT = 30000; // 30s
@Bean
public ZooKeeper zooKeeper() throws Exception {
return new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
System.out.println("event = " + event);
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("Java 连接 ZooKeeper 成功!");
}
}
});
}
}
② 若无法连接成功
若无法连接成功,尝试关闭防火墙。
③ Linux 防火墙相关命令
a. 启动防火墙
systemctl start firewalld
b. 关闭防火墙
systemctl stop firewalld
c. 关闭防火墙
systemctl stop firewalld
d. 重启防火墙
firewall-cmd --reload
e. 查看防火墙状态
systemctl status firewalld
(2) 创建节点
@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {
@Autowired
private ZooKeeper zooKeeper;
/**
* 创建节点
*
* @param path 路径
* @param data 数据
* @param nodeType 节点类型
* (常用取值: PERSISTENT、PERSISTENT_SEQUENTIAL、EPHEMERAL、EPHEMERAL_SEQUENTIAL)
* @return 新创建的节点的路径
*/
@PostMapping("/create")
public String create(String path, String data, String nodeType) throws Exception {
return zooKeeper.create(path,
data.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.valueOf(nodeType));
}
}
(3) 获取节点中的数据
① 同步获取
@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {
@Autowired
private ZooKeeper zooKeeper;
/**
* 同步获取指定节点下的数据
*/
@GetMapping("/get")
public String get(String path) throws Exception {
return path + " 节点下的数据: " +
new String(zooKeeper.getData(path, false,
getStatVersionInfo(path)));
}
/**
* 查询指定路径的节点的状态(版本)信息 (版本不存在返回 null)
*/
public Stat getStatVersionInfo(String path) throws Exception {
return zooKeeper.exists(path, false);
}
}
② 异步回调
@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {
@Autowired
private ZooKeeper zooKeeper;
/**
* 异步获取指定节点下的数据
*/
@GetMapping("/getAsync")
public String getAsync(String path) throws Exception {
String statString = getStatVersionInfo(path).toString();
zooKeeper.getData(path, false, new AsyncCallback.DataCallback() {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("数据:" + new String(data));
System.out.println("context:" + ctx);
}
}, path + "_" + statString);
return "获取 " + path + " 路径(节点)下的数据";
}
/**
* 查询指定路径的节点的状态(版本)信息 (版本不存在返回 null)
*/
public Stat getStatVersionInfo(String path) throws Exception {
return zooKeeper.exists(path, false);
}
}
③ 获取子节点列表
@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {
@Autowired
private ZooKeeper zooKeeper;
/**
* 获取指定路径下的子节点列表
*/
@GetMapping("/listChildNodes")
public List<String> listChildNodes(String path) throws Exception {
return zooKeeper.getChildren(path, false);
}
}
(4) 删除节点
@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {
@Autowired
private ZooKeeper zooKeeper;
/**
* 删除节点
*/
@PostMapping("/delete")
public String delete(String path) throws Exception {
Stat statVersionInfo = getStatVersionInfo(path);
if (statVersionInfo == null) return "节点不存在";
zooKeeper.delete(path, statVersionInfo.getVersion());
return getStatVersionInfo(path) == null ? "删除成功" : "删除失败";
}
/**
* 查询指定路径的节点的状态(版本)信息
*/
public Stat getStatVersionInfo(String path) throws Exception {
return zooKeeper.exists(path, false);
}
}
(5) 更新数据
@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {
@Autowired
private ZooKeeper zooKeeper;
/**
* 同步获取指定节点下的数据
*/
@GetMapping("/get")
public String get(String path) throws Exception {
return path + " 节点下的数据: " +
new String(zooKeeper.getData(path, false,
getStatVersionInfo(path)));
}
/**
* 更新节点数据
*/
@PostMapping("/updateNodeData")
public Map<String, String> updateNodeData(String path, String data) throws Exception {
Map<String, String> resMap = new HashMap<>();
Stat oldStat = getStatVersionInfo(path);
if (oldStat == null) {
resMap.put("msg", "节点不存在");
return resMap;
}
// 查询节点数据
String curNodeData = get(path);
Stat stat = zooKeeper.setData(path, data.getBytes(), oldStat.getVersion());
if (oldStat.getVersion() != stat.getVersion()) {
resMap.put("msg", "更新节点数据成功");
resMap.put("oldData", curNodeData);
resMap.put("newData", data);
} else {
resMap.put("msg", "更新节点数据失败");
}
return resMap;
}
/**
* 查询指定路径的节点的状态(版本)信息
*/
public Stat getStatVersionInfo(String path) throws Exception {
return zooKeeper.exists(path, false);
}
}
(6) 事件处理
ZooKeeper 中的 get 命令可查看节点中存储的数据,并绑定【节点数据改变事件】(是一次性事件)
ZooKeeper 中的 list 命令可查看子节点列表,并绑定【节点改变事件】(是一次性事件)
① 绑定一次性事件
a. 获取节点数据绑定的节点改变事件
@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {
@Autowired
private ZooKeeper zooKeeper;
/**
* 查询指定路径的节点的状态(版本)信息
*/
public Stat getStatVersionInfo(String path) throws Exception {
return zooKeeper.exists(path, false);
}
/**
* 获取节点数据绑定的一次性节点数据改变事件【NodeDataChanged】
*/
@GetMapping("/addWatchByGet")
public String addWatchByGet(String path) throws Exception {
Stat statVersion = getStatVersionInfo(path);
if (statVersion == null) return "节点不存在";
zooKeeper.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("【addWatchByGet】触发节点类型:" + event.getType());
}
}, statVersion);
return "获取节点数据绑定的一次性节点数据改变事件";
}
}
b. 获取子节点列表绑定的子节点改变事件
@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {
@Autowired
private ZooKeeper zooKeeper;
/**
* 查询指定路径的节点的状态(版本)信息
*/
public Stat getStatVersionInfo(String path) throws Exception {
return zooKeeper.exists(path, false);
}
/**
* 获取子节点列表绑定的一次性子节点改变事件【NodeChildrenChanged】
*/
@GetMapping("/addWatchByGetChildren")
public String addWatchByGetChildren(String path) throws Exception {
Stat statVersion = getStatVersionInfo(path);
if (statVersion == null) return "节点不存在";
zooKeeper.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("【addWatchByGetChildren】触发节点类型:" + event.getType());
}
});
return "获取子节点列表绑定的一次性子节点改变事件";
}
}
② 绑定永久事件
监听当前节点的数据改变和当前节点(路径)的子节点的创建和删除,子节点数据发生改变不会触发事件监听。
@RestController
@RequestMapping("/zookeepers")
public class ZookeeperController {
@Autowired
private ZooKeeper zooKeeper;
/**
* 查询指定路径的节点的状态(版本)信息
*/
public Stat getStatVersionInfo(String path) throws Exception {
return zooKeeper.exists(path, false);
}
/**
* 给指定路径(节点)绑定永久事件 (1. 子节点改变事件; 2. 节点数据改变事件)
*/
@PostMapping("/addWatch")
public String addWatch(String path) throws Exception {
Stat statVersion = getStatVersionInfo(path);
if (statVersion == null) return "节点不存在";
zooKeeper.addWatch(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
Event.EventType eventType = event.getType();
if (eventType == Event.EventType.NodeChildrenChanged) {
System.out.println("触发了子节点改变事件");
// 重新获取子节点列表的代码
} else if (eventType == Event.EventType.NodeDataChanged) {
System.out.println("触发了 NodeDataChanged");
// 重新获取节点数据的代码
}
}
}, AddWatchMode.PERSISTENT);
return "绑定永久事件";
}
}
③ 递归绑定事件
- 给当前节点及其子节点都绑定节点改变事件和节点数据改变事件
- 与绑定永久事件的代码的不同点是 addWatch 的最后一个参数不一样,绑定永久事件的最后一个参数是 AddWatchMode.PERSISTENT,递归绑定永久事件的最后一个参数是 AddWatchMode.PERSISTENT_RECURSIVE