手写zookeeper来模拟dubbo的注册/发现

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介:

zookeeper可能平时大家直接操作的并不多,而zookeeper的要点就在于4个节点状态(永久,有序,临时,临时有序)以及1个watcher的监控.

1.模拟注册:

pom引用(本人使用的zookeeper为3.4.6)

<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>


<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>


application.yml的内容为

server:

port: 8081
address: localhost

zookeeper:

address: 192.168.5.129:2181
sessionouttime: 4000

spring:

application:
    name: zk

先写一个注册中心

/**

  • 注册中心,对外提供注册服务
  • Created by Administrator on 2018/9/12.
    */

@Component
public class ZookeeperServer {

private ZooKeeper zk;

public ZooKeeper getConnection(String host,Watcher watcher) throws IOException {
    zk = new ZooKeeper(host, 500, watcher);
    return zk;
}

}
再写一个提供者注册类向Zookeeper注册

@Component
public class ZookRegister implements Watcher {

//获得配置资源
@Autowired
Environment env;

@Autowired
private ZookeeperServer zkServer ;
private ZooKeeper zk;
//固定的根目录比如公司名
final String fixedpath = "/guanjian";

@Value("spring.application.name")
String servername;

//spring容器初始化ZookRegister的实例时执行
@PostConstruct
public void register() throws Exception {
    String servername = env.getProperty("spring.application.name");
    String port = env.getProperty("server.port");
    String ip = env.getProperty("server.address");
    String address = env.getProperty("zookeeper.address");

// PrivderServer.zooKeeper = zook.create();
// ZooKeeper zooKeeper = PrivderServer.zooKeeper;

    this.zk = zkServer.getConnection(address ,this);
    Stat existsFixedpath = this.zk.exists(fixedpath, false);
    if (existsFixedpath == null) {
        //参数分别是创建的节点路径、节点的数据、权限(此处对所有用户开放)、节点的类型(此处是持久节点)
        zk.create(fixedpath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    String svnode = fixedpath + "/" + servername;
    Stat existsSvnode = zk.exists(svnode, false);
    //create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
    if (existsSvnode == null) {
        zk.create(svnode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    if (ip == null || "".equals(null)) {
        //如果配置文件中没有指定服务ip获取本机ip
        ip = InetAddress.getLocalHost().getHostAddress();
    }
    if (port == null || "".equals(null)) {
        port = "8080";
    }
    NodeStat nodeStat = new NodeStat();
    nodeStat.setIp(ip);
    nodeStat.setPort(port);
    nodeStat.setName(servername);
    nodeStat.setNum(0);
    nodeStat.setStatus(Status.wait);
    //临时节点的前缀是服务名称,节点数据是服务address
    String svipmlNode = fixedpath + "/" + servername + "/" + servername;
    //重点在于这里创建的是临时有序节点
    zk.create(svipmlNode, JSONObject.toJSONString(nodeStat).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}

@Override
public void process(WatchedEvent watchedEvent) {

}

}
其中NodeStat,Status类如下

@Data
public class NodeStat implements Serializable {

private String ip;
private String name;
private String port;
private Integer num;
private String status;
private String node;
private String client;

}
public class Status {

//wait无消费者,run运行中,stop禁用中
public static final String run = "run";
public static final String wait = "wait";
public static final String stop = "stop";

}
启动Springboot

@SpringBootApplication
public class ZkApplication {

public static void main(String[] args) throws InterruptedException {

  ApplicationContext applicationContext = SpringApplication.run(ZkApplication.class, args);
  Thread.sleep(Long.MAX_VALUE);

}
}
因为有一个长时间的休眠,当我们进入zookeeper查询的时候,我们会发现在/guanjian/zk下多了一个zk0000000004的节点.

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper, guanjian]
[zk: localhost:2181(CONNECTED) 1] ls /guanjian
[zk]
[zk: localhost:2181(CONNECTED) 2] ls /guanjian/zk
[]
[zk: localhost:2181(CONNECTED) 3] ls /guanjian/zk
[zk0000000004]
[zk: localhost:2181(CONNECTED) 4] get /guanjian/zk/zk0000000004
{"ip":"localhost","name":"zk","num":0,"port":"8081","status":"wait"}
cZxid = 0x2b
ctime = Thu Sep 13 11:20:03 CST 2018
mZxid = 0x2b
mtime = Thu Sep 13 11:20:03 CST 2018
pZxid = 0x2b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x165cc5492840010
dataLength = 68
numChildren = 0

当我们手动结束main方法后,再查询zookeeper(注:/guanjian/zk是永久节点)

[zk: localhost:2181(CONNECTED) 5] ls /guanjian/zk
[]

我们发现这个节点没有了,即这个临时节点消失.

2.模拟发现:

发现为在/guanjian/zk下随机获取一个临时排序节点作为我们要用的注册进来的服务,以进行后续操作,并更新这个节点的状态为run.

@Component
public class ClientComsumer implements Watcher {

//本地缓存服务列表
private static Map<String, List<String>> servermap;
@Autowired
private ZookeeperServer zkServer ;
private ZooKeeper zk;
@Autowired
Environment env;

@PostConstruct
private void init() throws IOException {
    String address = env.getProperty("zookeeper.address");
    this.zk = zkServer.getConnection(address,this);
}

private List<String> getNodeList(String serverName) throws KeeperException, InterruptedException, IOException {
    if (servermap == null) {
        servermap = new HashMap<>();
    }
    Stat exists = null;
    try {
        String s = "/guanjian/" + serverName;
        exists = zk.exists(s,this);
    } catch (Exception e) {
    }

    //判断是否存在该服务
    if (exists == null) return null;
    List<String> serverList = servermap.get(serverName);
    if (serverList != null && serverList.size() > 0) {
        return serverList;
    }
    List<String> children = zk.getChildren("/guanjian/" + serverName,this);
    List<String> list = new ArrayList<>();
    for (String s : children) {
        byte[] data = zk.getData("/guanjian/" + serverName + "/" + s, this, null);
        String datas = new String(data);
        NodeStat nodeStat = JSONObject.parseObject(datas, NodeStat.class);
        if (!Status.stop.equals(nodeStat.getStatus())) {
            list.add(datas);
        }
    }
    servermap.put(serverName, list);
    return list;
}

public String getServerinfo(String serverName) throws KeeperException, InterruptedException, IOException {
    try {
        List<String> nodeList = getNodeList(serverName);
        if (nodeList == null|| nodeList.size()<1) {
            return null;
        }
        //这里使用得随机负载策略,如需需要自己可以实现其他得负载策略
        String snode = nodeList.get((int) (Math.random() * nodeList.size()));
        NodeStat nodeStat = JSONObject.parseObject(snode, NodeStat.class);
        List<String> children = zk.getChildren("/guanjian/" + serverName,this);
        //随机负载后,将随机取得节点后的状态更新为run
        for (String s : children) {
            byte[] data = zk.getData("/guanjian/" + serverName + "/" + s, this, null);
            String datas = new String(data);
            if (snode.equals(datas)) {
                nodeStat.setStatus(Status.run);
                zk.setData("/guanjian/" + serverName + "/" + s,JSONObject.toJSONString(nodeStat).getBytes(),0);
                break;
            }
        }
        return JSONObject.toJSONString(nodeStat);
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

@Override
public void process(WatchedEvent watchedEvent) {
    //如果服务节点数据发生变化则清空本地缓存
    if (watchedEvent.getType().equals(Event.EventType.NodeChildrenChanged)) {
        servermap = null;
    }
}

}
调整main方法

@SpringBootApplication
public class ZkApplication {

public static void main(String[] args) throws InterruptedException, IOException, KeeperException {

  ApplicationContext applicationContext = SpringApplication.run(ZkApplication.class, args);
  ClientComsumer getServer = applicationContext.getBean(ClientComsumer.class);
  System.out.println(getServer.getServerinfo("zk"));
  Thread.sleep(Long.MAX_VALUE);

}
}
查看zookeeper内容如下:

[zk: localhost:2181(CONNECTED) 6] ls /guanjian/zk
[zk0000000005]
[zk: localhost:2181(CONNECTED) 7] get /guanjian/zk/zk0000000005
{"ip":"localhost","name":"zk","num":0,"port":"8081","status":"run"}
cZxid = 0x31
ctime = Thu Sep 13 13:41:27 CST 2018
mZxid = 0x32
mtime = Thu Sep 13 13:41:27 CST 2018
pZxid = 0x31
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x165cc5492840013
dataLength = 67
numChildren = 0

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
存储 负载均衡 监控
dubbo学习一:zookeeper与dubbo的关系,下载安装启动zookeeper(解决启动中报错)
这篇文章是关于Apache Dubbo框架与Zookeeper的关系,以及如何下载、安装和启动Zookeeper的教程,包括解决启动过程中可能遇到的报错问题。
137 3
dubbo学习一:zookeeper与dubbo的关系,下载安装启动zookeeper(解决启动中报错)
|
3月前
|
Dubbo 应用服务中间件 Apache
Dubbo 应用切换 ZooKeeper 注册中心实例,流量无损迁移
如果 Dubbo 应用使用 ZooKeeper 作为注册中心,现在需要切换到新的 ZooKeeper 实例,如何做到流量无损?
44 4
|
3月前
|
监控 Dubbo Java
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
这篇文章详细介绍了如何将Spring Boot与Dubbo和Zookeeper整合,并通过Dubbo管理界面监控服务注册情况。
238 0
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
|
5月前
|
Dubbo 网络协议 Java
深入掌握Dubbo服务提供者发布与注册原理
该文章主要介绍了Dubbo服务提供者发布与注册的原理,包括服务发布的流程、多协议发布、构建Invoker、注册到注册中心等过程。
深入掌握Dubbo服务提供者发布与注册原理
|
6月前
|
Nacos 微服务
Zookeeper 的 ZAB 协议 以及 zookeeper 与 nacos 注册中心比对
Zookeeper 的 ZAB 协议 以及 zookeeper 与 nacos 注册中心比对
106 4
|
6月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
146 3
|
7月前
|
缓存 NoSQL 数据库
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
121 0
|
4月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
27天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论