基于Zookeeper的分布式锁研究

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 分布式环境下如何保证一个数据在并发的情况下保证同一时间在一台机器只有一个线程在执行? 之前研究过基于redis的分布式锁,这里研究一下基于zookeeper的

分布式环境下如何保证一个数据在并发的情况下保证同一时间在一台机器只有一个线程在执行?

实现一个分布式锁需要解决的问题有以下几个主要点:

  • 资源被唯一执行
  • 良好的可重入机制:发生死锁情况下可被其他进程可重入
  • 良好的性能问题:操作简单,额外的请求不要过多
  • 容灾机制:个别的锁机器崩溃也能稳定运行

在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成以下 4 种节点类型。

  • 持久节点(PERSISTENT)
  • 持久顺序节点(PERSISTENT_SEQUENTIAL)
  • 临时节点(EPHEMERAL)
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL)

这里我们就基于临时有序节点去实现分布式锁:
1、首先创建一个根节点用于存放分布式锁需要新建的临时节点
2、在创建临时有序节点的时候,zk会根据client的创建请求会按顺序依次生成0000001、0000002、0000003这种类似序号的节点
3、每个client判断当前自己创建的节点是否为序号最小的节点,是的话就认为拿到了锁,可以执行业务代码,否则就去监听最小节点的删除时间
4、当处理完后需要删除节点,这样就能被其他线程监听到,开始竞争锁
image

当然,聪明的同学一看就能看出这个步骤有很多的问题,首先就是最小节点的删除时间会触发大量client的启动执行,需要发送大量的通知,而且如果在去创建监听的时候出现了最小节点已经被删除的情况,这时候永远都收不到最小节点的删除事件,而且如果按不同的资源创建了太多的目录的话,是否会对整个zk的性能产生影响,如果对不同资源采用同一个目录话,那么一个目录下的节点又过多,取子节点的性能又有问题...

所以该分布式锁的实现步骤可以优化一下
1、首先不必所有client都去监听最小节点的删除事件,只需要监听比自己稍小点的节点的删除事件即可
2、每此执行完处理后可以尝试性删除目录,避免随着时间的增长创建了过多的目录
3、另外zookeeper提供的API中设置监听器的操作与读操作是原子执行的,也就是说在读子节点列表时同时设置监听器,保证不会丢失事件,所以可以在创建监听的时候一旦发现监听的节点为空就认为节点已删除,可以拿到锁

下面是代码:里面包含了测试main方法
https://github.com/fengym201507411/lock

package com.fym;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.*;

/**
 * Created by fengyiming on 2018/12/24.
 * 基于zookeeper的代码
 */
public class LockServiceImpl {

    private static volatile ZooKeeper zooKeeper;

    /**
     * zk连接超时时间/s
     */
    private final static int SESSION_TIMEOUT = 10000;

    /**
     * 分布式锁创建key的根路径
     */
    private final static String PRE_ROOT_PATH = "/zkLockRoot";

    /**
     * /字符
     */
    private final static String PATH = "/";

    static {
        try {
            // 连接zookeeper
            zooKeeper = new ZooKeeper("127.0.0.1:2181", SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {

                }
            });
            Stat stat = zooKeeper.exists(PRE_ROOT_PATH, false);
            if (stat == null) {
                System.out.println("root path is null,create......");
                zooKeeper.create(PRE_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            System.out.println("root path ok");
        } catch (Exception e) {
            System.out.println("加载zk信息异常" + e.getMessage());
        }
    }


    public static void lock(String threadName, String key, long waitSeconds) {
        try {
            key = key + LocalDate.now().toString();
            System.out.println(threadName + "begin lock");
            String path = new StringBuffer(PRE_ROOT_PATH).append(PATH).append(key).toString();
            Stat stat = zooKeeper.exists(path, false);
            if (stat == null) {
                System.out.println(threadName + "key  path is null,create......");
                try {
                    zooKeeper.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                } catch (KeeperException k) {
                    System.out.println(threadName + "目录" + path + "已存在,无需重复创建");
                } catch (Exception e) {
                    System.out.println(threadName + "create key path error,error message:" + e.getMessage());
                }
            }
            LocalDateTime begin = LocalDateTime.now();
            String lockNodePre = new StringBuffer(PRE_ROOT_PATH).append(PATH).append(key).append(PATH).toString();
            String lockNode = zooKeeper.create(lockNodePre, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode
                .EPHEMERAL_SEQUENTIAL);
            String lockNodeName = lockNode.substring(lockNodePre.length());
            System.out.println(threadName + "创建有序临时节点:" + lockNode + ",节点名称:" + lockNodeName);
            // 取所有子节点
            List<String> subNodes = zooKeeper.getChildren(path, false);
            System.out.println(threadName + "当前竞争资源下的节点数:" + subNodes.size());
            //排序
            subNodes.sort(String::compareTo);
            System.out.println(threadName + "first:" + subNodes.get(0) + ",last: " + subNodes.get(subNodes.size() - 1));
            int index = subNodes.indexOf(lockNodeName);
            String minNodeName = subNodes.get(0);
            if (!lockNodeName.equals(minNodeName)) {
                String min1NodeName = subNodes.get(index - 1);
                String min1NodePath = new StringBuffer(PRE_ROOT_PATH).append(PATH).append(key).append(PATH)
                    .append(min1NodeName).toString();
                CountDownLatch countDownLatch = new CountDownLatch(1);
                System.out.println(threadName + "当前节点" + lockNodeName + "准备监听节点:" + min1NodeName);
                Stat min1Stat = zooKeeper.exists(min1NodePath, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.out.println(threadName + "节点" + event.getPath() + ",事件 : " + event.getType());
                        if (event.getType() == Event.EventType.NodeDeleted) {
                            System.out.println(threadName + "节点删除");
                            countDownLatch.countDown();
                        }
                    }
                });
                if (min1Stat == null) {
                    System.out.println(threadName + "节点不存在,无需等待,当前节点:" + lockNodeName + ",前一节点:" + min1NodeName);
                } else {
                    System.out.println(threadName + "------wait-------");
                    countDownLatch.await();
                }
            }
            //超时
            if(LocalDateTime.now().compareTo(begin.plusSeconds(waitSeconds)) > 0){
                throw new Exception("waite time out");
            }
            System.out.println(threadName + "拿到了lock" + lockNodeName + ",do -----");
            zooKeeper.delete(lockNode, -1);
            System.out.println(threadName + "执行完毕,解锁" + lockNodeName + "------");
            String lastNodeName = subNodes.get(subNodes.size() - 1);
            if (lockNodeName.equals(lastNodeName)) {
                try {
                    zooKeeper.delete(path, -1);
                    System.out.println(threadName + "尝试删除该key目录成功,path" + path);
                } catch (KeeperException k) {
                    System.out.println(threadName + "尝试删除该key目录,失败:" + k.getMessage());
                } catch (Exception e) {
                    System.out.println(threadName + "尝试删除该key目录,失败:" + e.getMessage());
                }
            }
        } catch (Exception e) {
            System.out.println(threadName + "lock error" + e.getMessage());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new
            LinkedBlockingQueue<Runnable>());
        for (int i = 0; i < 1000; i++) {
            String name = new StringBuffer("ThreadName[").append(i).append("]").toString();
            executor.execute(() -> {
                LockServiceImpl.lock(name, "firstLock",10);
            });
        }
    }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
29天前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
1月前
|
算法 前端开发
|
26天前
|
NoSQL 前端开发 算法
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
Redis问题之Redis分布式锁与Zookeeper分布式锁有何不同
|
2月前
|
Shell 虚拟化
分布式系统详解--框架(Zookeeper-基本shell命令)
分布式系统详解--框架(Zookeeper-基本shell命令)
33 1
|
1月前
|
安全 Java
使用Zookeeper实现分布式锁的最佳实践
使用Zookeeper实现分布式锁的最佳实践
|
2月前
|
缓存 NoSQL 数据库
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
分布式系统面试全集通第一篇(dubbo+redis+zookeeper----分布式+CAP+BASE+分布式事务+分布式锁)
61 0
|
2月前
|
设计模式 监控 安全
一文搞懂:zookeeper实现分布式锁安全用法
一文搞懂:zookeeper实现分布式锁安全用法
29 0
|
2月前
|
Java 网络安全
分布式系统详解--框架(Zookeeper-简介和集群搭建)
分布式系统详解--框架(Zookeeper-简介和集群搭建)
118 0
|
3月前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
464 2
|
3月前
|
监控 Dubbo 前端开发
快速入门分布式系统与Dubbo+zookeeper Demo
快速入门分布式系统与Dubbo+zookeeper Demo
460 0

热门文章

最新文章