5, zookeeper的ACL权限控制
主要由三部分组成,有权限模式,授权对象,权限信息。
5.1,权限信息
创建结点权限:授权对象可以在数据结点下面创建子结点
更新结点权限:授权对象可以在数据结点下面更新子结点
读取结点权限:授权对象可以读取该节点的内容以及子结点的列表信息
删除结点权限:删除数据结点的子结点
管理者权限:授权对象可以对该数据结点进行ACL权限设置
5.2,命令
getAcl:获取某个节点的acl权限信息
setAcl:设置某个节点的acl权限信息
addauth: 输入认证授权信息,相当于注册用户信息
加密如下:
#通过‐sha1的方法进行加密,base64的方法进行编码 echo ‐n user1:pass1 | openssl dgst ‐binary ‐sha1 | openssl base64
创建节点并进行加密
create / zk‐node datatest digest:gj:X/NSthOB0fD/OT6iilJ55WJVado=:cdrwa
或者直接使用setAcl
setAcl / zk‐node digest:gj:X/NSthOB0fD/OT6iilJ55WJVado=:cdrwa
因此访问前需要进行授权
addauth digest zhs:test get /zk‐node #获取
也可以使用明文的方式进行授权
#首先需要先登录 addauth digest u100:p100 #直接通过明文的方式创建节点 create /node‐1 node1data auth:u100:p100:cdwra 这是u100用户授权信息会被zk保存,可以认为当前的授权用户为u100 get /node‐1
也可以通过这个ip地址进行授权,也是需要登录才能访问结点
setAcl /node‐ip ip:192.168.109.128:cdwra create /node‐ip data ip:192.168.109.128:cdwra #登录 addauth digest u100:p100
6,ZooKeeper 内存数据和持久化
Zookeeper数据的组织形式为一个类似文件系统的数据结构,而这些数据都是存储在内存中的,所以我们可以认为,Zookeeper是一个基于内存的小型数据库。由ConcurrentHashMap存储数据。每一个操作都会存为一个事务日志,每一个操作都会持久化。
持久化数据方式主要有日志和快照。快照内存相对较小,日志的话会提前申请一个默认大小的空间,因此每个日志文件的大小都一样。
7,zookeeper代码实现
7.1,需要的依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.1</version> </dependency>
7.2,创建zookeeper客户端
//创建zookeeper客户端 zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType()== Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){ log.info("连接已建立"); countDownLatch.countDown(); } } });
zookeeper启动主要是通过线程的方式启动,里面主要包括两个线程,一个用来发送数据,一个来接受数据,并进行事件的监听。
this.cnxn.start(); public void start() { //客户端往服务端发送数据 this.sendThread.start(); //接收客户端响应的线程,同时还有事件线程 this.eventThread.start(); }
进入源码可以发现,这两个线程其实就是一个守护线程
//业务现场在运行的时候,守护线程会自动退出 this.setDaemon(true);
创建持久化结点
/** */myconfig:结点名称 *ZooDefs.Ids.OPEN_ACL_UNSAFE:用于全部权限 */ zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
基于乐观锁实现这个修改数据。通过dataVersion实现
@Test public void testSetData() throws KeeperException, InterruptedException { ZooKeeper zooKeeper = getZooKeeper(); Stat stat = new Stat(); //false:不监听 byte[] data = zooKeeper.getData(first_node, false, stat); // int version = stat.getVersion(); zooKeeper.setData(first_node, "third".getBytes(), 0); }
创建结点可以通过同步的方式实现,也可以同步异步的方式实现
//通过同步的方式实现,如果消息没有收到,就会阻塞 byte[] data = getZooKeeper().getData(first_node, watcher, null); //也可以通过异步的方式实现,即通过一个异步的方式进行回调 getZooKeeper().getData("/test", false, (rc, path, ctx, data, stat) -> { Thread thread = Thread.currentThread(); },"test");
8,Curator实战
这个是用Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁等
8.1,需要的依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>5.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency>
8.2,创建Curator实体
curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr()) .retryPolicy(retryPolicy) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .canBeReadOnly(true) .build(); //对连接进行一个监听 curatorFramework.getConnectionStateListenable().addListener((client, newState) -> { if (newState == ConnectionState.CONNECTED) { log.info("连接成功!"); } });
8.3,递归创建子结点
// 递归创建子节点,即子结点和子子结点都会被创建 @Test public void testCreateWithParent() throws Exception { //获取这个Curator连接 CuratorFramework cf = getCuratorFramework(); String pathWithParent = "/node-parent/sub-node-1"; String path = cf.create().creatingParentsIfNeeded().forPath(pathWithParent); log.info("curator create node :{} successfully.", path); }
CuratorFramework curatorFramework = getCuratorFramework(); String forPath = curatorFramework .create() .withProtection() //起一个保护机制 .withMode(CreateMode.EPHEMERAL_SEQUENTIAL). forPath("/curator-node", "some-data".getBytes()); log.info("curator create node :{} successfully.", forPath);
如客户端发送请求成功,但是服务端在响应的时候出现异常,那么这个客户端会进行这个重试的机制。在创建结点的时候会设置这个唯一id,如果创建成功,则不会重复创建。同时如果session过期的话,这个客户端也会将这个结点给删除。
8.4,删除结点
@Test public void testDelete() throws Exception { CuratorFramework cf = getCuratorFramework(); String pathWithParent = "/node-parent"; //删除当前结点 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent); }
8.5,通过线程池去实现这个异步的创建结点的方式
@Test public void testThreadPool() throws Exception { CuratorFramework curatorFramework = getCuratorFramework(); ExecutorService executorService = Executors.newSingleThreadExecutor(); String ZK_NODE="/zk-node"; curatorFramework.getData().inBackground((client, event) -> { log.info(" background: {}", event); },executorService).forPath(ZK_NODE); }