基于Zookeeper的分布式锁研究

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 分布式环境下如何保证一个数据在并发的情况下保证同一时间在一台机器只有一个线程在执行? 之前研究过基于redis的分布式锁,这里研究一下基于zookeeper的

分布式环境下如何保证一个数据在并发的情况下保证同一时间在一台机器只有一个线程在执行?

实现一个分布式锁需要解决的问题有以下几个主要点:

  • 资源被唯一执行
  • 良好的可重入机制:发生死锁情况下可被其他进程可重入
  • 良好的性能问题:操作简单,额外的请求不要过多
  • 容灾机制:个别的锁机器崩溃也能稳定运行

在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成以下 4 种节点类型。

  • 持久节点(PERSISTENT)
  • 持久顺序节点(PERSISTENT_SEQUENTIAL)
  • 临时节点(EPHEMERAL)
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL)

这里我们就基于临时有序节点去实现分布式锁:
1、首先创建一个根节点用于存放分布式锁需要新建的临时节点
2、在创建临时有序节点的时候,zk会根据client的创建请求会按顺序依次生成0000001、0000002、0000003这种类似序号的节点
3、每个client判断当前自己创建的节点是否为序号最小的节点,是的话就认为拿到了锁,可以执行业务代码,否则就去监听最小节点的删除时间
4、当处理完后需要删除节点,这样就能被其他线程监听到,开始竞争锁
image

当然,聪明的同学一看就能看出这个步骤有很多的问题,首先就是最小节点的删除时间会触发大量client的启动执行,需要发送大量的通知,而且如果在去创建监听的时候出现了最小节点已经被删除的情况,这时候永远都收不到最小节点的删除事件,而且如果按不同的资源创建了太多的目录的话,是否会对整个zk的性能产生影响,如果对不同资源采用同一个目录话,那么一个目录下的节点又过多,取子节点的性能又有问题...

所以该分布式锁的实现步骤可以优化一下
1、首先不必所有client都去监听最小节点的删除事件,只需要监听比自己稍小点的节点的删除事件即可
2、每此执行完处理后可以尝试性删除目录,避免随着时间的增长创建了过多的目录
3、另外zookeeper提供的API中设置监听器的操作与读操作是原子执行的,也就是说在读子节点列表时同时设置监听器,保证不会丢失事件,所以可以在创建监听的时候一旦发现监听的节点为空就认为节点已删除,可以拿到锁

下面是代码:里面包含了测试main方法
https://github.com/fengym201507411/lock

package com.fym;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.*;

/**
 * Created by fengyiming on 2018/12/24.
 * 基于zookeeper的代码
 */
public class LockServiceImpl {

    private static volatile ZooKeeper zooKeeper;

    /**
     * zk连接超时时间/s
     */
    private final static int SESSION_TIMEOUT = 10000;

    /**
     * 分布式锁创建key的根路径
     */
    private final static String PRE_ROOT_PATH = "/zkLockRoot";

    /**
     * /字符
     */
    private final static String PATH = "/";

    static {
        try {
            // 连接zookeeper
            zooKeeper = new ZooKeeper("127.0.0.1:2181", SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {

                }
            });
            Stat stat = zooKeeper.exists(PRE_ROOT_PATH, false);
            if (stat == null) {
                System.out.println("root path is null,create......");
                zooKeeper.create(PRE_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            System.out.println("root path ok");
        } catch (Exception e) {
            System.out.println("加载zk信息异常" + e.getMessage());
        }
    }


    public static void lock(String threadName, String key, long waitSeconds) {
        try {
            key = key + LocalDate.now().toString();
            System.out.println(threadName + "begin lock");
            String path = new StringBuffer(PRE_ROOT_PATH).append(PATH).append(key).toString();
            Stat stat = zooKeeper.exists(path, false);
            if (stat == null) {
                System.out.println(threadName + "key  path is null,create......");
                try {
                    zooKeeper.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                } catch (KeeperException k) {
                    System.out.println(threadName + "目录" + path + "已存在,无需重复创建");
                } catch (Exception e) {
                    System.out.println(threadName + "create key path error,error message:" + e.getMessage());
                }
            }
            LocalDateTime begin = LocalDateTime.now();
            String lockNodePre = new StringBuffer(PRE_ROOT_PATH).append(PATH).append(key).append(PATH).toString();
            String lockNode = zooKeeper.create(lockNodePre, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode
                .EPHEMERAL_SEQUENTIAL);
            String lockNodeName = lockNode.substring(lockNodePre.length());
            System.out.println(threadName + "创建有序临时节点:" + lockNode + ",节点名称:" + lockNodeName);
            // 取所有子节点
            List<String> subNodes = zooKeeper.getChildren(path, false);
            System.out.println(threadName + "当前竞争资源下的节点数:" + subNodes.size());
            //排序
            subNodes.sort(String::compareTo);
            System.out.println(threadName + "first:" + subNodes.get(0) + ",last: " + subNodes.get(subNodes.size() - 1));
            int index = subNodes.indexOf(lockNodeName);
            String minNodeName = subNodes.get(0);
            if (!lockNodeName.equals(minNodeName)) {
                String min1NodeName = subNodes.get(index - 1);
                String min1NodePath = new StringBuffer(PRE_ROOT_PATH).append(PATH).append(key).append(PATH)
                    .append(min1NodeName).toString();
                CountDownLatch countDownLatch = new CountDownLatch(1);
                System.out.println(threadName + "当前节点" + lockNodeName + "准备监听节点:" + min1NodeName);
                Stat min1Stat = zooKeeper.exists(min1NodePath, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.out.println(threadName + "节点" + event.getPath() + ",事件 : " + event.getType());
                        if (event.getType() == Event.EventType.NodeDeleted) {
                            System.out.println(threadName + "节点删除");
                            countDownLatch.countDown();
                        }
                    }
                });
                if (min1Stat == null) {
                    System.out.println(threadName + "节点不存在,无需等待,当前节点:" + lockNodeName + ",前一节点:" + min1NodeName);
                } else {
                    System.out.println(threadName + "------wait-------");
                    countDownLatch.await();
                }
            }
            //超时
            if(LocalDateTime.now().compareTo(begin.plusSeconds(waitSeconds)) > 0){
                throw new Exception("waite time out");
            }
            System.out.println(threadName + "拿到了lock" + lockNodeName + ",do -----");
            zooKeeper.delete(lockNode, -1);
            System.out.println(threadName + "执行完毕,解锁" + lockNodeName + "------");
            String lastNodeName = subNodes.get(subNodes.size() - 1);
            if (lockNodeName.equals(lastNodeName)) {
                try {
                    zooKeeper.delete(path, -1);
                    System.out.println(threadName + "尝试删除该key目录成功,path" + path);
                } catch (KeeperException k) {
                    System.out.println(threadName + "尝试删除该key目录,失败:" + k.getMessage());
                } catch (Exception e) {
                    System.out.println(threadName + "尝试删除该key目录,失败:" + e.getMessage());
                }
            }
        } catch (Exception e) {
            System.out.println(threadName + "lock error" + e.getMessage());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new
            LinkedBlockingQueue<Runnable>());
        for (int i = 0; i < 1000; i++) {
            String name = new StringBuffer("ThreadName[").append(i).append("]").toString();
            executor.execute(() -> {
                LockServiceImpl.lock(name, "firstLock",10);
            });
        }
    }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
4月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
4月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
30天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
2月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
100 11
|
3月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
73 2
|
3月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
72 1
|
3月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
64 0
|
4月前
|
Java
分布式-Zookeeper-分布式锁
分布式-Zookeeper-分布式锁
|
4月前
|
存储 负载均衡 算法
分布式-Zookeeper-Master选举
分布式-Zookeeper-Master选举
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?