zookeeper内部原理流程
用processon画的有点大,看不清的可以放大查看
流程图详解
1.zookeeper 集群(至少个节点)启动后,会自动选举出一个leader,其他节点为follower 跟随节点 2.client 连接给定的单地址或者集群,连接集群时,会挑选一个可用的节点进行连接,整个节点可能时leader,也可能时follower 3.如果是读请求,则直接从当前连接的节点获取指定的service实例 4.如果时写请求 4.1.如果当前连接的是leader,则leader会将信息同步到所有其他节点并且得到ack反馈后commit消息,返回给client 成功 4.2.如果当前连接的是follower,则会先把请求转发给leader,然后会继续4.1的流程
java curator 操作 zookeeper 集群
java curator maven pom
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery-server</artifactId> <version>5.2.1</version> </dependency>
java curator client操作集群注册
@Component public class RegisterZookeeper implements ApplicationRunner { @Value("${zookeeper.address}") private String zkAddress; @Override public void run(ApplicationArguments args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, new RetryOneTime(1000)); client.start(); client.blockUntilConnected(); ServiceInstance<Object> instance = ServiceInstance.builder().name("test1").address("10.188.17.28").port(7777).build(); ServiceDiscovery<Object> serviceDiscovery = ServiceDiscoveryBuilder.builder(Object.class).client(client) .basePath("/zookeeper-test").build(); serviceDiscovery.registerService(instance); serviceDiscovery.start(); System.out.println("service register ok"); } }
zkAddress 是配置文件里的zookeeper集群
zookeeper: address: hadoop1:2181,hadoop2:2181,hadoop3:2181
集群内查看注册信息
随便找一个节点进入后zookeeper 目录后 sh bin/zkCli.sh -server hadoop1:2181
然后ls -/ 查看全部注册信息
刚刚代码注册的实例信息
public class ZookeeperTester { private static ZooKeeper Zk; public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("hadoop1:2181,hadoop1:2181,hadoop1:2181", new RetryOneTime(1000)); client.start(); client.blockUntilConnected(); ServiceDiscovery<Object> serviceDiscovery = ServiceDiscoveryBuilder.builder(Object.class).client(client) .basePath("/zookeeper-test").build(); Collection<ServiceInstance<Object>> list = serviceDiscovery.queryForInstances("test1"); list.forEach((instance)->{ String servicePath = instance.getAddress()+":"+instance.getPort(); System.out.println(servicePath); }); Thread.sleep(10000); } }
代码运行结果
获取到了刚刚注册的实例test1