分布式锁——基于Zookeeper

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 分布式锁——基于Zookeeper

正文


一、分布式锁


我们知道,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的锁进行处理,使用synchronized关键字或者使用Lock锁,但是这些只是针对单个应用,也就是只能在同一个JVM生效。随着业务发展,单机应用已经不能满足我们的需要,我们需要集群,分布式,这个时候就需要考虑到分布式锁。


二、Zookeeper实现分布式锁的原理


根据zk临时节点的唯一性,当多个请求同时创建相同的节点,只要谁能够创建成功 谁就能够获取到锁。

在创建节点时,如果该节点已经被其他请求创建则进入等待。

只要能够创建节点成功,就认为获取到了锁,则开始进入到正常业务逻辑操作,其他没有获取锁进行等待;

正常业务逻辑流程执行完后,调用zk关闭连接方式释放锁,从而是其他的请求开始进入到获取锁的资源。


三、代码


package com.xiaojie.template;
/**
 * 分布式锁
 */
public abstract class DistributeLock {
    public void getLock() {
        //获取锁
        if (tryLock()) {
            System.out.println(Thread.currentThread().getName() + "获取锁成功");
        } else {
            //等待锁
            waitLock();
            //重新获取
            getLock();
        }
    }
    /**
     * 等待锁
     */
    protected abstract void waitLock();
    /**
     * 尝试获取锁
     */
    protected abstract boolean tryLock();
    /**
     * 释放锁
     */
    public abstract void unLock();
}
package com.xiaojie.template;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.concurrent.CountDownLatch;
/**
 * 使用Zk获取分布式锁
 */
public class ZkDistributeLock extends DistributeLock {
    //参数1 连接地址
    private static final String ADDRESS = "192.168.139.154:2181,192.168.139.154:2182,192.168.139.154:2183";
    // 参数2 zk超时时间
    private static final int TIMEOUT = 5000;
    // 创建我们的zk连接
    private ZkClient zkClient = new ZkClient(ADDRESS, TIMEOUT);
    /**
     * 共同的创建临时节点
     */
    private String lockPath = "/myLock";
    private CountDownLatch countDownLatch = null;
    @Override
    protected void waitLock() {
/**
 * 使用事件通知,如果有节点删除,则重新开始竞争锁
 */
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
            }
            @Override
            public void handleDataDeleted(String s) throws Exception {
                if (countDownLatch != null) {
                    //如果删除节点,countDownLatch变为0,开始竞争锁
                    countDownLatch.countDown();
                }
            }
        };
        zkClient.subscribeDataChanges(lockPath, iZkDataListener);
        // 2.使用countDownLatch等待
        if (countDownLatch == null) {
            countDownLatch = new CountDownLatch(1);
            System.out.println("开始等待锁。。。。。。。。。。");
        }
        try {
            // 如果当前计数器不是为0 就一直等待
            countDownLatch.await();
        } catch (Exception e) {
        }
        // 3. 如果当前节点被删除的情况下,需要重新进入到获取锁
        zkClient.unsubscribeDataChanges(lockPath, iZkDataListener);
    }
    /**
     * 尝试获取锁
     */
    @Override
    public boolean tryLock() {
        // 获取锁的思想:多个jvm同时创建临时节点,只要谁能够创建成功 谁能够获取到锁
        try {
            zkClient.createEphemeral(lockPath);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
    /**
     * 释放锁。关闭临时节点,其他资源去竞争锁
     */
    @Override
    public void unLock() {
        if (zkClient != null) {
            zkClient.close();
            System.out.println(Thread.currentThread().getName() + ",释放了锁>>>");
        }
    }
}
package com.xiaojie.utils;
/**
 * 雪花算法生成id
 */
public class SnowFlake {
    /**
     * 起始的时间戳
     */
    private final static long START_STMP = 1480166465631L;
    /**
     * 每一部分占用的位数
     */
    private final static long SEQUENCE_BIT = 12; //序列号占用的位数
    private final static long MACHINE_BIT = 5;   //机器标识占用的位数
    private final static long DATACENTER_BIT = 5;//数据中心占用的位数
    /**
     * 每一部分的最大值
     */
    private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
    private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
    private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
    /**
     * 每一部分向左的位移
     */
    private final static long MACHINE_LEFT = SEQUENCE_BIT;
    private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
    private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;
    private long datacenterId;  //数据中心
    private long machineId;     //机器标识
    private long sequence = 0L; //序列号
    private long lastStmp = -1L;//上一次时间戳
    public SnowFlake() {
    }
    public SnowFlake(long datacenterId, long machineId) {
        if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
            throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
        }
        if (machineId > MAX_MACHINE_NUM || machineId < 0) {
            throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
        }
        this.datacenterId = datacenterId;
        this.machineId = machineId;
    }
    /**
     * 产生下一个ID
     *
     * @return
     */
    public long nextId() {
        long currStmp = getNewstmp();
        if (currStmp < lastStmp) {
            throw new RuntimeException("Clock moved backwards.  Refusing to generate id");
        }
        if (currStmp == lastStmp) {
            //相同毫秒内,序列号自增
            sequence = (sequence + 1) & MAX_SEQUENCE;
            //同一毫秒的序列数已经达到最大
            if (sequence == 0L) {
                currStmp = getNextMill();
            }
        } else {
            //不同毫秒内,序列号置为0
            sequence = 0L;
        }
        lastStmp = currStmp;
        return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
                | datacenterId << DATACENTER_LEFT       //数据中心部分
                | machineId << MACHINE_LEFT             //机器标识部分
                | sequence;                             //序列号部分
    }
    private long getNextMill() {
        long mill = getNewstmp();
        while (mill <= lastStmp) {
            mill = getNewstmp();
        }
        return mill;
    }
    private long getNewstmp() {
        return System.currentTimeMillis();
    }
}
package com.xiaojie.utils;
import com.xiaojie.template.DistributeLock;
import com.xiaojie.template.ZkDistributeLock;
public class IdGenerate implements Runnable {
    private DistributeLock distributeLock = new ZkDistributeLock();
    private SnowFlake snowFlake = new SnowFlake();
    @Override
    public void run() {
        String s = genId();
    }
    public String genId() {
        try {
            distributeLock.getLock();
            //获取到锁执行业务,获取不到等待
            String id = String.valueOf(snowFlake.nextId());
            System.out.println(">>>>>>>>>>>" + id);
            return id;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            distributeLock.unLock();
        }
        return null;
    }
}
public class Test {
    public static void main(String[] args) {
//        IdGenerate idGenerate = new IdGenerate();
        for (int i = 0; i < 100; i++) {
            new Thread(new IdGenerate()).start();
        }
    }
}


参考:https://blog.csdn.net/weixin_44455476/article/details/105397576

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
4月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
9天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
2月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
|
3月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
60 2
|
3月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
62 1
|
3月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
56 0
|
4月前
|
Java
分布式-Zookeeper-分布式锁
分布式-Zookeeper-分布式锁
|
4月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)
|
6月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决