Zookeeper分布式锁

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Zookeeper分布式锁


大多数互联网系统都是分布式部署的,分布式部署确实能带来性能和效率上的提升,但为此,我们就需要多解决一个分布式环境下,数据一致性的问题 在单机环境中,我们可以通过java提供的并发API(lock、syn等)来解决,而在分布式环境下要复杂的多(跨JVM),常见的方案是分布式事物、分布式锁等。

Zookeeper实现分布式锁

业务场景和产生的问题

在分布式环境下,生产全局订单号,由于多个客户端不能实现同步,在分布式场景下使用时间戳生产订单号可能会重复 解决方案:

  1. 使用分布式锁
  2. 提前生产好订单号,存放到redis,然后从redis中取

Zookeeper基于同名节点的分布式锁

Zookeeper的强一致性能够很好地保证在分布式高并发情况下节点的创建能够保证全局唯一性,利用Zookeeper的这个特性,实现排它锁。

  • 定义锁:Zookeeper上的节点表示一个锁
  • 获取锁:利用zkClient客户端调用create方法创建一个临时节点(锁),创建成功的客户端获得了锁
  • 监控锁:同时在该节点上注册Watcher监听,实时监听该节点的变更情况
  • 释放锁:当前获得锁的客户端宕机或者异常,Zookeeper上这个临时节点就会被删除;客户端主动删除该临时节点

具体实现:

  1. 创建一个Lock接口,这个并不是JDK提供的,只是模拟了该接口里的方法自定义实现,按需使用。
package com.ooliuyue.zookeeperlock.zk;
public interface Lock {
    public void  getLock();
    public void unLock();
}
复制代码
  1. 创建抽象类AbstratcLock实现Lock接口(模板)
package com.ooliuyue.zookeeperlock.zk;
/**
* @Auther: ly
* @Date: 2019/4/25 11:31
*/
public abstract class AbstracLock implements Lock {
   public void getLock() {
       //尝试获得锁资源
       if (tryLock()) {
           System.out.println("##获取Lock锁的资源##");
       } else {
           //等待(监控)
           waitLock();
           //重新获取锁
           getLock();
       }
   }
   public abstract boolean tryLock();
   public abstract void waitLock();
}
复制代码
  1. 创建类ZookeeperAbstractLock(重复代码写入子类), 用于配置Zookeeper
package com.ooliuyue.zookeeperlock.zk;
import org.I0Itec.zkclient.ZkClient;
/**
 * @Auther: ly
 * @Date: 2019/4/25 11:41
 */
//将重复代码写入子类中
public abstract class ZookeeperAbstractLock extends AbstracLock {
    //zk连接地址
    private static final String CONNECTSTRING = "127.0.0.1:2181";
    //创建zk连接
    protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
    protected static final String PATH = "/lock";
    protected static final String PATH2 = "/lock2";
}
复制代码
  1. zookeeper实现分布式锁的业务逻辑
package com.ooliuyue.zookeeperlock.zk;
import org.I0Itec.zkclient.IZkDataListener;
import java.util.concurrent.CountDownLatch;
/**
 * @Auther: ly
 * @Date: 2019/4/25 11:45
 */
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {
    private CountDownLatch countDownLatch = null;
    @Override
    //尝试获得锁
    public boolean tryLock() {
        try {
            zkClient.createEphemeral(PATH);
            System.out.println("当前线程获取锁" + PATH + "成功");
            return true;
        } catch (Exception e) {
            //如果创建失败抛出异常
//            e.printStackTrace();
            return false;
        }
    }
    @Override
    public void waitLock() {
        /*当前节点数据内容或版本发生变化或者当前节点被删除,触发当前接口 */
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String path, Object o) throws Exception {
            }
            @Override
            /* 删除当前节点,触发该接口通知。此时,path即当前节点路径 */
            public void handleDataDeleted(String path) throws Exception {
                //唤醒被等待的线程
                if (countDownLatch != null ) {
                    countDownLatch.countDown();
                }
            }
        };
        //监听节点PATH的变化
        zkClient.subscribeDataChanges(PATH,iZkDataListener);
        //如果节点存在
        if (zkClient.exists(PATH)) {
            countDownLatch = new CountDownLatch(1);
            try {
                //等待,一直到监听器发起删除节点的通知,表示锁释放
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //删除监听
        zkClient.unsubscribeDataChanges(PATH,iZkDataListener);
    }
    public void unLock() {
        //释放锁
        if (zkClient != null) {
            zkClient.delete(PATH);
            zkClient.close();
            System.out.println("释放锁资源。。");
        }
    }
}
复制代码
  1. 创建测试类OrderService,模拟10个线程并发生产订单
package com.ooliuyue.zookeeperlock.zk;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
/**
 * @Auther: ly
 * @Date: 2019/4/25 14:39
 */
public class OrderService implements Runnable {
    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
    private Lock lock = new ZookeeperDistrbuteLock();
    @Override
    public void run() {
        getNum();
    }
    public void getNum() {
        try {
            lock.getLock();
            String number = orderNumGenerator.getNumber();
            System.out.println(Thread.currentThread().getName() + ",生产订单ID:" + number);
        } catch (Exception e) {
//            e.printStackTrace();
        } finally {
            lock.unLock();
        }
    }
    public static void main(String[] args) {
        System.out.println("##生产唯一订单号##");
        for (int i = 0; i < 10 ; i++) {
            new Thread(new OrderService()).start();
        }
    }
}
复制代码
  1. 生成订单的类
package com.ooliuyue.zookeeperlock.zk;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
 * @Auther: ly
 * @Date: 2019/4/25 14:34
 */
public class OrderNumGenerator {
    private static int count = 0;
    public String getNumber() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
        return simpleDateFormat.format(new Date()) + "-" + ++count;
    }
}
复制代码

运行main函数,控制台输出结果

##生产唯一订单号##
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-3,生产订单ID:2019-04-28-11-42-53-1
释放锁资源。。
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-7,生产订单ID:2019-04-28-11-42-53-2
释放锁资源。。
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-9,生产订单ID:2019-04-28-11-42-53-3
释放锁资源。。
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-11,生产订单ID:2019-04-28-11-42-53-4
释放锁资源。。
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-1,生产订单ID:2019-04-28-11-42-53-5
释放锁资源。。
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-15,生产订单ID:2019-04-28-11-42-53-6
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-17,生产订单ID:2019-04-28-11-42-53-7
释放锁资源。。
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-19,生产订单ID:2019-04-28-11-42-53-8
释放锁资源。。
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-13,生产订单ID:2019-04-28-11-42-53-9
释放锁资源。。
当前线程获取锁/lock成功
##获取Lock锁的资源##
Thread-5,生产订单ID:2019-04-28-11-42-53-10
释放锁资源。。
复制代码

思路就是通过监听机制,当节点被删除时,表示锁释放,其他线程去获取锁。 这个方式是有问题的,当锁释放时,会有很多线程同时去获取锁,羊群效应。性能会比较差,不过实现起来简单。

Zookeeper基于临时顺序节点实现分布式锁

具体实现

  • 多个进程访问共享资源时,为它们在一个父节点下分别创建临时有序节点
  • 判断创建的节点是否是所有节点中序号最小的
  • 如果是序号最小的节点,对应的进程获得锁。否则通过watcher机制监听比自己小的那个节点(这里需要进行阻塞),当收到删除的通知,然后去获取锁
  • 释放锁的时候删除当前节点

代码如下:

package com.ooliuyue.zookeeperlock.zk;
import org.I0Itec.zkclient.IZkDataListener;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: ly
* @Date: 2019/4/26 10:55
*/
public class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock {
   private CountDownLatch countDownLatch = null;
   private String beforePath; //当前线程的节点前一个节点
   private String currentPath; //当前线程的节点
   public ZookeeperDistrbuteLock2(){
       if (!this.zkClient.exists(PATH2)) {
           this.zkClient.createPersistent(PATH2);
       }
   }
   @Override
   public boolean tryLock() {
       //如果currentPath为空则为当前线程尝试加锁
       if (currentPath == null || currentPath.length() <= 0) {
           //创建一个临时顺序节点
           currentPath = this.zkClient.createEphemeralSequential(PATH2 + '/',"lock");
           System.out.println("当前线程的锁为===" + currentPath);
       }
       //获取所有临时节点并排序,临时节点名称为自增长字符串如:/lock2/0000000004
       List<String> children = this.zkClient.getChildren(PATH2);
       Collections.sort(children);
       //如果当前线程节点在所有节点中排名第一则获取锁成功
       if (currentPath.equals(PATH2 + '/' + children.get(0))) {
           System.out.println(currentPath + "===节点获取锁成功");
           return true;
       } else {
           //如果当前线程节点在所有节点中不是排第一,则获取前面的节点名称,并赋值给beforePath
           int i = Collections.binarySearch(children, currentPath.substring(7));
           beforePath = PATH2 + '/' + children.get(i - 1);
       }
       return false;
   }
   @Override
   public void waitLock() {
       IZkDataListener iZkDataListener = new IZkDataListener() {
           @Override
           public void handleDataChange(String s, Object o) throws Exception {
           }
           @Override
           public void handleDataDeleted(String s) throws Exception {
               if (countDownLatch != null) {
                   countDownLatch.countDown();
               }
           }
       };
       //给排在前面的节点增加数据删除的watcher,(启动另外一个线程去监听它)
       System.out.println(currentPath + "===监控beforPath===" + beforePath);
       this.zkClient.subscribeDataChanges(beforePath,iZkDataListener);
       if (this.zkClient.exists(beforePath)) {
           countDownLatch = new CountDownLatch(1);
           try {
               //监听线程执行完毕后删除监听
               countDownLatch.await();
           } catch (InterruptedException e) {
//                e.printStackTrace();
           }
       }
       this.zkClient.unsubscribeDataChanges(beforePath,iZkDataListener);
   }
   @Override
   public void unLock() {
       System.out.println(currentPath + "===释放锁资源...");
       zkClient.delete(currentPath);
       zkClient.close();
   }
}
复制代码

运行main函数进行测试

package com.ooliuyue.zookeeperlock.zk;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: ly
* @Date: 2019/4/25 14:39
*/
public class OrderService implements Runnable {
   private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
   private Lock lock = new ZookeeperDistrbuteLock2();
   @Override
   public void run() {
       getNum();
   }
   public void getNum() {
       try {
           lock.getLock();
           String number = orderNumGenerator.getNumber();
           System.out.println(Thread.currentThread().getName() + ",生产订单ID:" + number);
       } catch (Exception e) {
//            e.printStackTrace();
       } finally {
           lock.unLock();
       }
   }
   public static void main(String[] args) {
       System.out.println("##生产唯一订单号##");
       for (int i = 0; i < 10 ; i++) {
           new Thread(new OrderService()).start();
       }
   }
}
复制代码

控制台输出结果:

##生产唯一订单号##
当前线程的锁为===/lock2/0000000083
当前线程的锁为===/lock2/0000000084
/lock2/0000000083===节点获取锁成功
Thread-1,生产订单ID:2019-04-26-14-12-37-1
/lock2/0000000083===释放锁资源...
当前线程的锁为===/lock2/0000000085
当前线程的锁为===/lock2/0000000086
当前线程的锁为===/lock2/0000000087
/lock2/0000000086===监控beforPath===/lock2/0000000085
/lock2/0000000085===监控beforPath===/lock2/0000000084
/lock2/0000000084===监控beforPath===/lock2/0000000083
/lock2/0000000087===监控beforPath===/lock2/0000000086
当前线程的锁为===/lock2/0000000088
/lock2/0000000084===节点获取锁成功
Thread-3,生产订单ID:2019-04-26-14-12-37-2
/lock2/0000000084===释放锁资源...
当前线程的锁为===/lock2/0000000089
/lock2/0000000089===监控beforPath===/lock2/0000000088
当前线程的锁为===/lock2/0000000090
/lock2/0000000090===监控beforPath===/lock2/0000000089
当前线程的锁为===/lock2/0000000091
/lock2/0000000091===监控beforPath===/lock2/0000000090
/lock2/0000000085===节点获取锁成功
Thread-5,生产订单ID:2019-04-26-14-12-37-3
/lock2/0000000085===释放锁资源...
当前线程的锁为===/lock2/0000000092
/lock2/0000000086===节点获取锁成功
Thread-7,生产订单ID:2019-04-26-14-12-37-4
/lock2/0000000086===释放锁资源...
/lock2/0000000087===节点获取锁成功
Thread-9,生产订单ID:2019-04-26-14-12-37-5
/lock2/0000000087===释放锁资源...
/lock2/0000000088===节点获取锁成功
Thread-11,生产订单ID:2019-04-26-14-12-37-6
/lock2/0000000088===释放锁资源...
/lock2/0000000089===节点获取锁成功
Thread-13,生产订单ID:2019-04-26-14-12-37-7
/lock2/0000000089===释放锁资源...
/lock2/0000000090===节点获取锁成功
Thread-15,生产订单ID:2019-04-26-14-12-37-8
/lock2/0000000090===释放锁资源...
/lock2/0000000091===节点获取锁成功
Thread-17,生产订单ID:2019-04-26-14-12-37-9
/lock2/0000000091===释放锁资源...
/lock2/0000000092===节点获取锁成功
Thread-19,生产订单ID:2019-04-26-14-12-37-10
/lock2/0000000092===释放锁资源...
复制代码



相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
15天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
30 2
|
3月前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
70 0
|
2月前
|
Java Linux Spring
Zookeeper实现分布式服务配置中心
Zookeeper实现分布式服务配置中心
48 0
|
2月前
|
存储 分布式计算 Hadoop
ZooKeeper初探:分布式世界的守护者
ZooKeeper初探:分布式世界的守护者
64 0
|
2月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
298 0
|
3月前
|
监控 前端开发 Java
JAVAEE分布式技术之Zookeeper技术
JAVAEE分布式技术之Zookeeper技术
16 0
JAVAEE分布式技术之Zookeeper技术
|
3月前
|
NoSQL 测试技术 Redis
Zookeeper实现分布式锁
ZooKeeper是一个分布式协调服务,其中提供的序列化、持久化、有层次的目录结构使得它非常适合用于实现分布式锁。在ZooKeeper中,分布式锁通常通过临时有序节点实现
|
3月前
|
存储 算法 Java
【分布式】Zookeeper 使用环境搭建
【1月更文挑战第25天】【分布式】Zookeeper 使用环境搭建
|
3月前
|
监控 Dubbo Java
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
61 0
|
3月前
|
NoSQL 中间件 API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(下)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
82 2