Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码

章节内容

上节我们完成了:


ZooKeeper的Leader选举机制

ZooKeeper的选举过程

ZooKeeper的ZAB协议

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。

之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。


2C4G 编号 h121

2C4G 编号 h122

2C2G 编号 h123

分布式锁

出现问题1(单机器)

假设 Redis 里面的某个商品库存为1,此时两个用户同时下单,其中一个下单请求执行到第3步,更新数据库的库存为0,但是第4步还没执行。

而另外一个用户下单执行到了第二步,发现库存还是1,就会继续执行第3步。

但是此时库存已经为0了,所以数据库没有限制,此时会出现超卖的问题。

解决方案1

  • 用锁把2、3、4步锁住,让他们执行完后,另一个线程才能够继续执行。
  • 但是由于业务发展迅速,原来的单机已经不能够满足,此时增加一台机器后,会出现更严重的问题。

出现问题2(多机器)

假设有两个订单同时执行,分别有两个机器执行,那么这两个请求就是可以同时执行了,这样就依然出现了超卖的问题。

解决方案2

我们需要使用分布式锁来解决上面出现的问题。

分布式锁的作用就是在整个系统中提供一个全局的、唯一的锁,在分布式系统中每个系统进行相关的操作时都需要获取到该锁,才能够执行相应的操作。


ZK 分布式锁

实现思路

锁就是ZK指定目录下序号最小的临时节点,多个系统的多个线程都要在此目录下创建临时顺序节点,因为ZK会保证节点的顺序性,所以可以利用节点的顺序性进行锁判断。

每个线程都是先创建临时顺序节点,然后获取当前目录下最小的节点(序号),判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是则获取锁失败。

获取锁失败的线程获取当前节点上一个临时顺序节点,并对此节点进行监听,当该节点删除时,代表释放了锁。

流程图

编写代码

LockTest

package icu.wzk.zk.demo02;

public class LockTest {

    public static void main(String[] args) {
        for (int i = 0; i < 10; i ++) {
            // 启动10个
            new Thread(new LockRunnable()).start();
        }
    }

    static class LockRunnable implements Runnable {

        @Override
        public void run() {
            final ClientTest clientTest = new ClientTest();
            clientTest.getLock();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            clientTest.deleteLock();
        }
    }

}

ClientTest

package icu.wzk.zk.demo02;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ClientTest {

    private ZkClient zkClient = new ZkClient("h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181");

    String beforeNodePath;

    String currentNodePath;

    CountDownLatch countDownLatch = null;

    public ClientTest() {
        synchronized (ClientTest.class) {
            if (!zkClient.exists("/lock")) {
                zkClient.createPersistent("/lock");
            }
        }
    }

    public boolean tryGetLock() {
        if (null == currentNodePath || currentNodePath.isEmpty()) {
            currentNodePath = zkClient.createEphemeralSequential("/lock/", "lock");
        }
        final List<String> childs = zkClient.getChildren("/lock");
        Collections.sort(childs);
        final String minNode = childs.get(0);
        if (currentNodePath.equals("/lock/" + minNode)) {
            return true;
        } else {
            final int i = Collections.binarySearch(childs, currentNodePath.substring("/lock/".length()));
            String lastNodeChild = childs.get(i - 1);
            beforeNodePath = "/lock/" + lastNodeChild;
        }
        return false;
    }

    public void waitForLock() {
        final IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                //
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                countDownLatch.countDown();
            }
        };
        zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener);

        if (zkClient.exists(beforeNodePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener);
    }

    public void deleteLock() {
        if (zkClient != null) {
            zkClient.delete(currentNodePath);
            zkClient.close();
        }
    }

    public void getLock() {
        final String threadName = Thread.currentThread().getName();
        if (tryGetLock()) {
            System.out.println(threadName + ": 获取到了锁!");
        } else {
            System.out.println(threadName + ": 没有获取到锁!");
            waitForLock();
            // 自己调用自己
            getLock();
        }
    }


}

运行结果

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
3天前
|
Java
在 Java 中捕获和处理自定义异常的代码示例
本文提供了一个 Java 代码示例,展示了如何捕获和处理自定义异常。通过创建自定义异常类并使用 try-catch 语句,可以更灵活地处理程序中的错误情况。
|
18天前
|
XML 安全 Java
Java反射机制:解锁代码的无限可能
Java 反射(Reflection)是Java 的特征之一,它允许程序在运行时动态地访问和操作类的信息,包括类的属性、方法和构造函数。 反射机制能够使程序具备更大的灵活性和扩展性
32 5
Java反射机制:解锁代码的无限可能
|
14天前
|
jenkins Java 测试技术
如何使用 Jenkins 自动发布 Java 代码,通过一个电商公司后端服务的实际案例详细说明
本文介绍了如何使用 Jenkins 自动发布 Java 代码,通过一个电商公司后端服务的实际案例,详细说明了从 Jenkins 安装配置到自动构建、测试和部署的全流程。文中还提供了一个 Jenkinsfile 示例,并分享了实践经验,强调了版本控制、自动化测试等关键点的重要性。
46 3
|
19天前
|
存储 安全 Java
系统安全架构的深度解析与实践:Java代码实现
【11月更文挑战第1天】系统安全架构是保护信息系统免受各种威胁和攻击的关键。作为系统架构师,设计一套完善的系统安全架构不仅需要对各种安全威胁有深入理解,还需要熟练掌握各种安全技术和工具。
56 10
|
15天前
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
13天前
|
Java
Java代码解释++i和i++的五个主要区别
本文介绍了前缀递增(++i)和后缀递增(i++)的区别。两者在独立语句中无差异,但在赋值表达式中,i++ 返回原值,++i 返回新值;在复杂表达式中计算顺序不同;在循环中虽结果相同但使用方式有别。最后通过 `Counter` 类模拟了两者的内部实现原理。
Java代码解释++i和i++的五个主要区别
|
16天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
2月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)