需求
Leader Election
https://curator.apache.org/getting-started.html
Case
官方demo
https://github.com/apache/curator/tree/master/curator-examples/src/main/java/leader
Code
我们自己也写个demo 吧
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.8</version> </dependency>
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class LeaderSelectorTest { private static final String CONNECT_STR="192.168.126.135:2181"; private static RetryPolicy retryPolicy=new ExponentialBackoffRetry( 5*1000, 10 ); private static CuratorFramework curatorFramework; private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws InterruptedException { String appName = System.getProperty("appName"); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy); LeaderSelectorTest.curatorFramework = curatorFramework; curatorFramework.start(); LeaderSelectorListener listener = new LeaderSelectorListenerAdapter(){ // 选举 leader后的回调 @Override public void takeLeadership(CuratorFramework client) throws Exception { // this callback will get called when you are the leader // do whatever leader work you need to and only exit // this method when you want to relinquish leadership System.out.println(" LEADER . 【"+appName +"】, Pre Warm Cache "); // 模拟业务耗时操作 TimeUnit.SECONDS.sleep(5); } }; LeaderSelector selector = new LeaderSelector(curatorFramework, "/cache_warmer_leader", listener); selector.autoRequeue(); // not required, but this is behavior that you will probably expect selector.start(); countDownLatch.await(); } }
zk中的数据
源码分析
我们来看下 curator是怎么实现的?