大数据Zookeeper组件 2

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
数据可视化 DataV,5个大屏 1个月
云原生网关 MSE Higress,422元/月
简介: 大数据Zookeeper组件

3: Zookeeper安装

3.1 集群规划

服务器IP 主机名 myid的值
192.168.174.100 node01 1

192.168.174.110 node02 2
192.168.174.120 node03 3

第一步:下载zookeeeper的压缩包,下载网址如下

http://archive.apache.org/dist/zookeeper/


我们在这个网址下载我们使用的zk版本为3.4.9


下载完成之后,上传到我们的linux的/export/softwares路径下准备进行安装


第二步:解压


解压zookeeper的压缩包到/export/servers路径下去,然后准备进行安装

cd /export/software
tar -zxvf zookeeper-3.4.9.tar.gz -C ../servers/ 

第三步:修改配置文件

第一台机器修改配置文件

cd /export/servers/zookeeper-3.4.9/conf/
cp zoo_sample.cfg zoo.cfg
mkdir -p /export/servers/zookeeper-3.4.9/zkdatas/

vim zoo.cfg

dataDir=/export/servers/zookeeper-3.4.9/zkdatas
# 保留多少个快照
autopurge.snapRetainCount=3
# 日志多少小时清理一次
autopurge.purgeInterval=1
# 集群中服务器地址
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

第四步:添加myid配置


在第一台机器的


/export/servers/zookeeper-3.4.9/zkdatas /这个路径下创建一个文件,文件名为myid ,文件内容为1


echo 1 > /export/servers/zookeeper-3.4.9/zkdatas/myid


第五步:安装包分发并修改myid的值


安装包分发到其他机器


第一台机器上面执行以下两个命令


scp -r /export/servers/zookeeper-3.4.9/ node02:/export/servers/


scp -r /export/servers/zookeeper-3.4.9/ node03:/export/servers/


第二台机器上修改myid的值为2


echo 2 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第三台机器上修改myid的值为3

echo 3 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第六步:三台机器启动zookeeper服务

三台机器启动zookeeper服务

这个命令三台机器都要执行

/export/servers/zookeeper-3.4.9/bin/zkServer.sh start

查看启动状态

/export/servers/zookeeper-3.4.9/bin/zkServer.sh status


4 Zookeeper的Shell 客户端操作

命令 说明 参数
create [-s] [-e] path data acl 创建Znode -s 指定是顺序节点
-e 指定是临时节点
ls path [watch] 列出Path下所有子Znode
get path [watch] 获取Path对应的Znode的数据和属性
ls2 path [watch] 查看Path下所有子Znode以及子Znode的属性
set path data [version] 更新节点 version 数据版本
delete path [version] 删除节点, 如果要删除的节点有子Znode则无法删除 version 数据版本
rmr path 删除节点, 如果有子Znode则递归删除
setquota -n|-b val path 修改Znode配额 -n 设置子节点最大个数
-b 设置节点数据最大长度
history 列出历史记录

1:创建普通节点

create /app1 hello

2: 创建顺序节点

create -s /app3 world

3:创建临时节点

create -e /tempnode world

4:创建顺序的临时节点

create -s -e /tempnode2 aaa

5:获取节点数据

get /app1

6:修改节点数据

set /app1 xxx

7:删除节点

delete /app1 删除的节点不能有子节点

rmr /app1 递归删除

Znode 的特点

文件系统的核心是 Znode

如果想要选取一个 Znode, 需要使用路径的形式, 例如 /test1/test11

Znode 本身并不是文件, 也不是文件夹, Znode 因为具有一个类似于 Name 的路径, 所以可以从逻辑上实现一个树状文件系统

ZK 保证 Znode 访问的原子性, 不会出现部分 ZK 节点更新成功, 部分 ZK 节点更新失败的问题

Znode 中数据是有大小限制的, 最大只能为1M

Znode是由三个部分构成

stat: 状态, Znode的权限信息, 版本等

data: 数据, 每个Znode都是可以携带数据的, 无论是否有子节点

children: 子节点列表

Znode 的类型


每个Znode有两大特性, 可以构成四种不同类型的Znode

持久性

持久 客户端断开时, 不会删除持有的Znode

临时 客户端断开时, 删除所有持有的Znode, 临时Znode不允许有子Znode

顺序性

有序 创建的Znode有先后顺序, 顺序就是在后面追加一个序列号, 序列号是由父节点管理的自增

无序 创建的Znode没有先后顺序

Znode的属性

dataVersion 数据版本, 每次当Znode中的数据发生变化的时候, dataVersion都会自增一下

cversion 节点版本, 每次当Znode的节点发生变化的时候, cversion都会自增

aclVersion ACL(Access Control List)的版本号, 当Znode的权限信息发生变化的时候aclVersion会自增

zxid 事务ID

ctime 创建时间

mtime 最近一次更新的时间

ephemeralOwner 如果Znode为临时节点, ephemeralOwner表示与该节点关联的SessionId

通知机制


通知类似于数据库中的触发器, 对某个Znode设置 Watcher, 当Znode发生变化的时候, WatchManager会调用对应的Watcher

当Znode发生删除, 修改, 创建, 子节点修改的时候, 对应的Watcher会得到通知

Watcher的特点

一次性触发 一个 Watcher 只会被触发一次, 如果需要继续监听, 则需要再次添加 Watcher

事件封装: Watcher 得到的事件是被封装过的, 包括三个内容 keeperState, eventType, path

KeeperState EventType 触发条件 说明
None 连接成功
SyncConnected NodeCreated Znode被创建 此时处于连接状态
SyncConnected NodeDeleted Znode被删除 此时处于连接状态
SyncConnected NodeDataChanged Znode数据被改变 此时处于连接状态
SyncConnected NodeChildChanged Znode的子Znode数据被改变 此时处于连接状态
Disconnected None 客户端和服务端断开连接 此时客户端和服务器处于断开连接状态
Expired None 会话超时 会收到一个SessionExpiredException
AuthFailed None 权限验证失败 会收到一个AuthFailedException

会话

  • 在ZK中所有的客户端和服务器的交互都是在某一个Session中的, 客户端和服务器创建一个连接的时候同时也会创建一个Session
  • Session会在不同的状态之间进行切换: CONNECTING, CONNECTED, RECONNECTING, RECONNECTED, CLOSED
  • ZK中的会话两端也需要进行心跳检测, 服务端会检测如果超过超时时间没收到客户端的心跳, 则会关闭连接, 释放资源, 关闭会话

5. zookeeper的数据模型

  • ZooKeeper 的数据模型,在结构上和标准文件系统的非常相似,拥有一个层

次的命名空间,都是采用树形层次结构.

ZooKeeper 树中的每个节点被称为—个Znode。和文件系统的目录树一样,ZooKeeper 树中的每个节点可以拥有子节点。

但也有不同之处:


Znode 兼具文件和目录两种特点。既像文件一样维护着数据、元信息、ACL、 时间戳等数据结构,又像目录一样可以作为路径标识的一部分,并可以具有 子 Znode。用户对 Znode 具有增、删、改、查等操作(权限允许的情况下)。

Znode 存储数据大小有限制。ZooKeeper 虽然可以关联一些数据,但并没有 被设计为常规的数据库或者大数据存储,相反的是,它用来管理调度数据, 比如分布式应用中的配置文件信息、状态信息、汇集位置等等。这些数据的 共同特性就是它们都是很小的数据,通常以 KB 为大小单位。ZooKeeper 的服 务器和客户端都被设计为严格检查并限制每个 Znode 的数据大小至多 1M,常规使用中应该远小于此值。

Znode 通过路径引用,如同 Unix 中的文件路径。路径必须是绝对的,因此他 们必须由斜杠字符来开头。除此以外,他们必须是唯一的,也就是说每一个 路径只有一个表示,因此这些路径不能改变。在 ZooKeeper 中,路径由 Unicode 字符串组成,并且有一些限制。字符串"/zookeeper"用以保存管理 信息,比如关键配额信息。

每个 Znode 由 3 部分组成:

stat:此为状态信息, 描述该 Znode 的版本, 权限等信息

data:与该 Znode 关联的数据

children:该 Znode 下的子节点

6 Znode节点类型

6.1 Znode 有两种,分别为临时节点和永久节点。节点的类型在创建时即被确定,并且不能改变。


临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话结束,临时 节点将被自动删除,当然可以也可以手动删除。临时节点不允许拥有子节点。

永久节点:该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。

6.2 Znode 还有一个序列化的特性,如果创建的时候指定的话,该 Znode 的名字后面会自动追加一个不断增加的序列号。序列号对于此节点的父节点来说是唯一的,这样便会记录每个子节点创建的先后顺序。它的格式为“%10d”(10 位数字,没有数值的数位用 0 补充,例如“0000000001”)。


6.3 这样便会存在四种类型的 Znode 节点,分别对应:


PERSISTENT:永久节点

EPHEMERAL:临时节点

PERSISTENT_SEQUENTIAL:永久节点、序列化

EPHEMERAL_SEQUENTIAL:临时节点、序列化

7.Zookeeper的Shell 客户端操作

7.1 登录Zookeeper客户端

bin/zkCli.sh  -server node01:2181

7.2 Zookeeper客户端操作命令

命令 说明 参数
create [-s] [-e] path data acl 创建Znode -s 指定是顺序节点
-e 指定是临时节点
ls path [watch] 列出Path下所有子Znode
get path [watch] 获取Path对应的Znode的数据和属性
ls2 path [watch] 查看Path下所有子Znode以及子Znode的属性
set path data [version] 更新节点 version 数据版本
delete path [version] 删除节点, 如果要删除的节点有子Znode则无法删除 version 数据版本
rmr path 删除节点, 如果有子Znode则递归删除
setquota -n|-b val path 修改Znode配额 -n 设置子节点最大个数
-b 设置节点数据最大长度
history 列出历史记录

7.3 操作实例

列出Path下的所有Znode

ls /

创建永久节点

create /hello world

创建临时节点:

create -e /abc 123

创建永久序列化节点:

create -s /zhangsan boy

创建临时序列化节点:

create -e -s /lisi boy

修改节点数据

set /hello zookeeper

删除节点, 如果要删除的节点有子Znode则无法删除

delete /hello

删除节点, 如果有子Znode则递归删除

rmr /abc

列出历史记录

histroy

7.4 节点属性

每个 znode 都包含了一系列的属性,通过命令 get,可以获得节点的属性。

dataVersion:数据版本号,每次对节点进行 set 操作,dataVersion 的值都会增加 1(即使设置的是相同的数据),可有效避免了数据更新时出现的先后顺序问题。


cversion :子节点的版本号。当 znode 的子节点有变化时,cversion 的值就会增加 1。


aclVersion :ACL 的版本号。


cZxid :Znode 创建的事务 id。


mZxid :Znode 被修改的事务 id,即每次对 znode 的修改都会更新 mZxid。


对于 zk 来说,每次的变化都会产生一个唯一的事务 id,zxid(ZooKeeper Transaction Id)。通过 zxid,可以确定更新操作的先后顺序。例如,如果 zxid1

小于 zxid2,说明 zxid1 操作先于 zxid2 发生,zxid 对于整个 zk 都是唯一的,

ctime:节点创建时的时间戳.


mtime:节点最新一次更新发生时的时间戳.


ephemeralOwner:如果该节点为临时节点, ephemeralOwner 值表示与该节点绑定的 session id. 如果不 是,ephemeralOwner 值为 0.


7.5 Zookeeper的watch机制


通知类似于数据库中的触发器, 对某个Znode设置 Watcher, 当Znode发生变化的时候, WatchManager会调用对应的Watcher

当Znode发生删除, 修改, 创建, 子节点修改的时候, 对应的Watcher会得到通知

Watcher的特点

一次性触发 一个 Watcher 只会被触发一次, 如果需要继续监听, 则需要再次添加 Watcher

事件封装: Watcher 得到的事件是被封装过的, 包括三个内容 keeperState, eventType, path

KeeperState EventType 触发条件 说明
None 连接成功
SyncConnected NodeCreated Znode被创建 此时处于连接状态
SyncConnected NodeDeleted Znode被删除 此时处于连接状态
SyncConnected NodeDataChanged Znode数据被改变 此时处于连接状态
SyncConnected NodeChildChanged Znode的子Znode数据被改变 此时处于连接状态
Disconnected None 客户端和服务端断开连接 此时客户端和服务器处于断开连接状态
Expired None 会话超时 会收到一个SessionExpiredExceptio
AuthFailed None 权限验证失败 会收到一个AuthFailedException

8: zookeeper的JavaAPI操作

这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端框架 Curator ,解决了很多Zookeeper客户端非常底层的细节开发工作 。

Curator包含了几个包:


curator-framework:对zookeeper的底层api的一些封装

curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器等

Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败):


8.1、创建java工程,导入jar包


创建maven java工程,导入jar包

  <!-- <repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
      </repositories> -->
    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

8.2 节点的操作

创建永久节点

@Test
public void createNode() throws Exception {
   RetryPolicy retryPolicy = new  ExponentialBackoffRetry(1000, 1);
   //获取客户端对象
   CuratorFramework client =     CuratorFrameworkFactory.newClient("192.168.174.100:2181,192.168.174.110:2181,192.168.174.120:2181", 1000, 1000, retryPolicy);
  //调用start开启客户端操作
  client.start();
  //通过create来进行创建节点,并且需要指定节点类型
  client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello3/world");
 client.close();
}

创建临时节点

public void createNode2() throws Exception {
  RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
  CuratorFramework client = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 3000, 3000, retryPolicy);
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/hello5/world");
Thread.sleep(5000);
client.close();
}

修改节点数据

/**
   * 节点下面添加数据与修改是类似的,一个节点下面会有一个数据,新的数据会覆盖旧的数据
   * @throws Exception
   */
  @Test
  public void nodeData() throws Exception {
    RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
    CuratorFramework client = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 3000, 3000, retryPolicy);
    client.start();
    client.setData().forPath("/hello5", "hello7".getBytes());
    client.close();
}

节点数据查询

  /**
   * 数据查询
   */
  @Test
  public void updateNode() throws Exception {
    RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
    CuratorFramework client = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 3000, 3000, retryPolicy);
    client.start();
    byte[] forPath = client.getData().forPath("/hello5");
    System.out.println(new String(forPath));
    client.close();
  }
  /**
   * zookeeper的watch机制
   * @throws Exception
   */
  @Test
  public void watchNode() throws Exception {
    RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", policy);
    client.start();
    // ExecutorService pool = Executors.newCachedThreadPool();  
          //设置节点的cache  
          TreeCache treeCache = new TreeCache(client, "/hello5");  
          //设置监听器和处理过程  
          treeCache.getListenable().addListener(new TreeCacheListener() {  
              @Override  
              public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {  
                  ChildData data = event.getData();  
                  if(data !=null){  
                      switch (event.getType()) { 
                      case NODE_ADDED:  
                          System.out.println("NODE_ADDED : "+ data.getPath() +"  数据:"+ new String(data.getData()));  
                          break;  
                      case NODE_REMOVED:  
                          System.out.println("NODE_REMOVED : "+ data.getPath() +"  数据:"+ new String(data.getData()));  
                          break;  
                      case NODE_UPDATED:  
                          System.out.println("NODE_UPDATED : "+ data.getPath() +"  数据:"+ new String(data.getData()));  
                          break;  
                      default:  
                          break;  
                      }  
                  }else{  
                      System.out.println( "data is null : "+ event.getType());  
                  }  
              }  
          });  
          //开始监听  
          treeCache.start();  
          Thread.sleep(50000000);
  }
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
阿里云实时数仓实战 - 项目介绍及架构设计
课程简介 1)学习搭建一个数据仓库的过程,理解数据在整个数仓架构的从采集、存储、计算、输出、展示的整个业务流程。 2)整个数仓体系完全搭建在阿里云架构上,理解并学会运用各个服务组件,了解各个组件之间如何配合联动。 3&nbsp;)前置知识要求 &nbsp; 课程大纲 第一章&nbsp;了解数据仓库概念 初步了解数据仓库是干什么的 第二章&nbsp;按照企业开发的标准去搭建一个数据仓库 数据仓库的需求是什么 架构 怎么选型怎么购买服务器 第三章&nbsp;数据生成模块 用户形成数据的一个准备 按照企业的标准,准备了十一张用户行为表 方便使用 第四章&nbsp;采集模块的搭建 购买阿里云服务器 安装 JDK 安装 Flume 第五章&nbsp;用户行为数据仓库 严格按照企业的标准开发 第六章&nbsp;搭建业务数仓理论基础和对表的分类同步 第七章&nbsp;业务数仓的搭建&nbsp; 业务行为数仓效果图&nbsp;&nbsp;
目录
相关文章
|
6天前
|
存储 分布式计算 大数据
Hadoop 生态圈中的组件如何协同工作来实现大数据处理的全流程
Hadoop 生态圈中的组件如何协同工作来实现大数据处理的全流程
|
2月前
|
SQL 分布式计算 资源调度
常用大数据组件的Web端口号总结
这是关于常用大数据组件Web端口号的总结。通过虚拟机名+端口号可访问各组件服务:Hadoop HDFS的9870,YARN的ResourceManager的8088和JobHistoryServer的19888,Zeppelin的8000,HBase的10610,Hive的10002。ZooKeeper的端口包括客户端连接的2181,服务器间通信的2888以及选举通信的3888。
54 2
常用大数据组件的Web端口号总结
|
2月前
|
消息中间件 分布式计算 大数据
大数据组件之storm简介
大数据组件之storm简介
33 2
|
2月前
|
监控 大数据 数据处理
大数据组件之Storm简介
【5月更文挑战第2天】Apache Storm是用于实时大数据处理的分布式系统,提供容错和高可用的实时计算。核心概念包括Topology(由Spouts和Bolts构成的DAG)、Spouts(数据源)和Bolts(数据处理器)。Storm通过acker机制确保数据完整性。常见问题包括数据丢失、性能瓶颈和容错理解不足。避免这些问题的方法包括深入学习架构、监控日志、性能调优和编写健壮逻辑。示例展示了实现单词计数的简单Topology。进阶话题涵盖数据延迟、倾斜的处理,以及Trident状态管理和高级实践,强调调试、性能优化和数据安全性。
35 4
|
2月前
|
SQL 分布式计算 Hadoop
假如大数据组件中的动物都变成神奇宝贝,那会变成什么样?
假如大数据组件中的动物都变成神奇宝贝,那会变成什么样?
36 1
|
2月前
|
SQL 分布式计算 Hadoop
假如大数据组件中的动物都变成神奇宝贝,那会变成什么样?(大数据的组件动漫化)
假如大数据组件中的动物都变成神奇宝贝,那会变成什么样?(大数据的组件动漫化)
56 1
|
2月前
|
监控 物联网 大数据
助力工业物联网,工业大数据之服务域:AirFlow的架构组件【三十二】
助力工业物联网,工业大数据之服务域:AirFlow的架构组件【三十二】
64 0
|
2月前
|
大数据 Java 分布式数据库
使用记忆法打造你的大数据组件的默认端口号记忆宫殿
使用记忆法打造你的大数据组件的默认端口号记忆宫殿
40 0
|
2月前
|
大数据
大数据组件的默认端口号思维导图
大数据组件的默认端口号思维导图
32 0
|
2月前
|
Ubuntu 大数据 Linux
【大数据组件】一篇文章让你快速入门Docker
【大数据组件】一篇文章让你快速入门Docker
53 0