TreeCache
TreeCache可以看成是NodeCache和PathChildrenCache的集合,监听的是整棵数的变化,包含当前节点和子节点的变化。
// 监听整棵树的变化 TreeCache treeCache = new TreeCache(zkClient, getPath()); TreeCacheListener treeCacheListener = (framework, event) -> { System.out.println("事件类型:" + event.getType() + " | 路径:" + (null != event.getData() ? event.getData().getPath() : null)); }; treeCache.getListenable().addListener(treeCacheListener); treeCache.start(); 复制代码
leader选举
这里的leader选举不是指zookeeper内部的leader选举,而是指基于zookeeper实现应用层面的leader选举,比如有一个任务,在多节点环境下只允许一个节点处理,而其他节点收到相关的任务都要交给指定的那个节点执行,这里就可以基于zookeeper选出一个节点做为leader,而其他节点则是coordinator。
选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务leader,leader负责写操作,然后通过Zab协议实现follower的同步,leader或者follower都可以处理读操作。除此之外, 选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。
Curator有两种leader选举的策略,分别是LeaderSelector和LeaderLatch,前者是所有存活的客户端不间断的轮流做Leader。后者是一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。
LeaderSelector
LeaderSelector核心构造方法有两个
public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener) public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener) 复制代码
可以通过start()启动LeaderSelector,一旦启动,当节点取得领导权时会调用takeLeadership()。
举一个LeaderSelector的使用例子:
public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable { private String name; private LeaderSelector leaderSelector; public LeaderSelectorAdapter(CuratorFramework client, String path, String name){ this.name = name; leaderSelector = new LeaderSelector(client, path, this); leaderSelector.autoRequeue(); } public void start() { leaderSelector.start(); } @Override public void close() throws IOException{ leaderSelector.close(); } @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { final int waitSeconds = (int) (5 * Math.random()) + 1; System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds..."); try { TimeUnit.SECONDS.sleep(waitSeconds); } catch (InterruptedException e) { System.err.println(name + " was interrupted."); Thread.currentThread().interrupt(); } finally { System.out.println(name + " release leadership.\n"); } } } 复制代码
/** * 创建zk客户端 */ private CuratorFramework createClient(){ CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkAddr) .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(new ExponentialBackoffRetry(10000, 5)) .namespace("leaderSelector") .build(); return client; } /** * 轮询创建leader */ private void initLeaderSelector() throws Exception{ List<CuratorFramework> clients = new ArrayList<>(); List<LeaderSelectorAdapter> leaders = new ArrayList<>(); try { for (int i = 0; i < 10; i++) { CuratorFramework client = createClient(); clients.add(client); LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, "/node2", "client#" + i); leaders.add(selectorAdapter); client.start(); selectorAdapter.start(); } new BufferedReader(new InputStreamReader(System.in)).readLine(); }finally { for (LeaderSelectorAdapter leader : leaders) { leader.close(); } for (CuratorFramework client : clients) { client.close(); } } } 复制代码
同时,LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。当网络连接出现异常,leader不再认为自己还是leader。当连接重连后LeaderLatch会删除先前的ZNode然后重新创建一个,所以推荐使用ConnectionStateListener来处理网络抖动问题。
LeaderSelector必须小心连接状态的改变。如果实例成为leader, 它应该响应SUSPENDED或LOST。 当SUSPENDED状态出现时, 实例必须假定在重新连接成功之前它可能不再是leader了。 如果LOST状态出现, 实例不再是leader,takeLeadership方法返回。
推荐处理方式是当收到SUSPENDED或LOST时抛出CancelLeadershipException异常.。这会导致LeaderSelector实例中断并取消执行takeLeadership方法的异常。LeaderSelectorListenerAdapter.stateChanged提供了推荐的处理逻辑。
LeaderLatch
LeaderLatch有两个构造函数:
public LeaderLatch(CuratorFramework client, String latchPath) public LeaderLatch(CuratorFramework client, String latchPath, String id) 复制代码
可以通过start()启动LeaderLatch,一旦启动,LeaderLatch会和其它使用相同latchPath的其它LeaderLatch通信,最终只有一个会被选举为leader。hasLeadership()可以查看LeaderLatch实例是否为leader:
leaderLatch.hasLeadership( ); // 返回true说明当前实例是leader 复制代码
LeaderLatch类似JDK的CountDownLatch,在请求成为leadership会block(阻塞),一旦不使用LeaderLatch了,必须调用close方法。 如果它是leader,会释放leadership,其它的参与者则会继续选举出一个leader。
举一个LeaderLatch的使用例子:
/** * 多节点下每个节点轮询当leader */ private void initLeaderLatch() throws Exception { List<CuratorFramework> clients = new ArrayList<>(); List<LeaderLatch> latches = new ArrayList<>(); try { for (int i = 0; i < 10; i++){ CuratorFramework client = createClient(); clients.add(client); LeaderLatch latch = new LeaderLatch(client, "/node", "client#" + i); latch.addListener(new LeaderLatchListener() { @Override public void isLeader() { System.out.println("i am leader"); } @Override public void notLeader() { System.out.println("i am not leader"); } }); latches.add(latch); client.start(); latch.start(); } Thread.sleep(10000); LeaderLatch currentLeader = null; for (LeaderLatch latch : latches) { if (latch.hasLeadership()) { currentLeader = latch; } } System.out.println("current leader is " + currentLeader.getId()); currentLeader.close(); Thread.sleep(10000); for (LeaderLatch latch : latches) { if (latch.hasLeadership()) { currentLeader = latch; } } System.out.println("current leader is " + currentLeader.getId()); }finally { for (LeaderLatch latch : latches) { if (latch.getState() != null && latch.getState() != LeaderLatch.State.CLOSED){ latch.close(); } } for (CuratorFramework client : clients) { if(client != null){ client.close(); } } } } 复制代码
分布式锁
可重入锁
Curator实现的可重入锁跟jdk的ReentrantLock类似,即可重入,意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞,由类InterProcessMutex来实现。
//实例化锁 InterProcessMutex lock = new InterProcessMutex(zkClient, path); try { lock.acquire(); /** * TODO 业务逻辑 */ }finally { lock.release(); } 复制代码
不可重入锁
这个锁和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入。这个类是InterProcessSemaphoreMutex,使用方法和InterProcessMutex类似。
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(zkClient, path); try { lock.acquire(3, TimeUnit.SECONDS); /** * TODO 业务逻辑 */ }finally { lock.release(); } 复制代码
可重入读写锁
Curator实现的可重入锁类似jdk的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁在使用时不允许读(阻塞)。
此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>请求读锁—>释放读锁 ---->释放写锁。从读锁升级成写锁是不行的。
可重入读写锁主要由两个类实现:InterProcessReadWriteLock、InterProcessMutex。使用时首先创建一个InterProcessReadWriteLock实例,然后再根据你的需求得到读锁或者写锁,读写锁的类型是InterProcessMutex。
// 实例化锁 InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zkClient, path); // 获取读锁 InterProcessMutex readLock = lock.readLock(); // 获取写锁 InterProcessMutex writeLock = lock.writeLock(); try { // 只能先得到写锁再得到读锁,不能反过来 if(!writeLock.acquire(3, TimeUnit.SECONDS)){ throw new IllegalStateException("acquire writeLock err"); } if(!readLock.acquire(3, TimeUnit.SECONDS)){ throw new IllegalStateException("acquire readLock err"); } /** * TODO 业务逻辑 */ }finally { readLock.release(); writeLock.release(); } 复制代码
结尾
主要介绍了zookeeper的安装 和他的具体使用,包括用Java Curator框架的crud 分布式协调服务,分布式锁等等