分布式锁实现原理与最佳实践(3)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 分布式锁实现原理与最佳实践

3.4 zookeeper 瞬时znode节点 + watcher监听机制

临时节点具备数据自动删除的功能。当client与ZooKeeper连接和session断掉时,相应的临时节点就会被删除。zk有瞬时和持久节点,瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失,基于zk的瞬时有序节点实现分布式锁:

  • 多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁;
  • 其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点;
  • 下一个序号的线程得到通知,继续执行;
  • 以此类推,创建节点的时候,就确认了线程执行的顺序。

image.png

<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.14</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>

zk 的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。exists、create、getChildren三个方法都可以添加watcher ,也就是在调用方法的时候传递true就是添加监听。注意这里Lock 实现了Watcher和AutoCloseable:

当前线程创建的节点是第一个节点就获得锁,否则就监听自己的前一个节点的事件:

/**
 * 自己本身就是一个 watcher,可以得到通知
 * AutoCloseable 实现自动关闭,资源不使用的时候
 */
@Slf4j
public class ZkLock implements AutoCloseable, Watcher {
    private ZooKeeper zooKeeper;
    /**
     * 记录当前锁的名字
     */
    private String znode;
    public ZkLock() throws IOException {
        this.zooKeeper = new ZooKeeper("localhost:2181",
                10000,this);
    }
    public boolean getLock(String businessCode) {
        try {
            //创建业务 根节点
            Stat stat = zooKeeper.exists("/" + businessCode, false);
            if (stat==null){
                zooKeeper.create("/" + businessCode,businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }
            //创建瞬时有序节点  /order/order_00000001
            znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            //获取业务节点下 所有的子节点
            List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
            //获取序号最小的(第一个)子节点
            Collections.sort(childrenNodes);
            String firstNode = childrenNodes.get(0);
            //如果创建的节点是第一个子节点,则获得锁
            if (znode.endsWith(firstNode)){
                return true;
            }
            //如果不是第一个子节点,则监听前一个节点
            String lastNode = firstNode;
            for (String node:childrenNodes){
                if (znode.endsWith(node)){
                    zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
                    break;
                }else {
                    lastNode = node;
                }
            }
            synchronized (this){
                wait();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    @Override
    public void close() throws Exception {
        zooKeeper.delete(znode,-1);
        zooKeeper.close();
        log.info("我已经释放了锁!");
    }
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDeleted){
            synchronized (this){
                notify();
            }
        }
    }
}


3.5 zookeeper curator

在实际的开发中,不建议去自己“重复造轮子”,而建议直接使用Curator客户端中的各种官方实现的分布式锁,例如其中的InterProcessMutex可重入锁。

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>4.2.0</version>
  <exclusions>
    <exclusion>
      <artifactId>slf4j-api</artifactId>
      <groupId>org.slf4j</groupId>
    </exclusion>
  </exclusions>
</dependency>
@Bean(initMethod="start",destroyMethod = "close")
public CuratorFramework getCuratorFramework() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.
        newClient("localhost:2181", retryPolicy);
    return client;
}

框架已经实现了分布式锁。zk的Java客户端升级版。使用的时候直接指定重试的策略就可以。

官网中分布式锁的实现是在curator-recipes依赖中,不要引用错了。

@Autowired
private CuratorFramework client;
@Test
public void testCuratorLock(){
    InterProcessMutex lock = new InterProcessMutex(client, "/order");
    try {
        if ( lock.acquire(30, TimeUnit.SECONDS) ) {
            try  {
                log.info("我获得了锁!!!");
            }
            finally  {
                lock.release();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    client.close();
}


3.6 Redission

重新实现了Java并发包下处理并发的类,让其可以跨JVM使用,例如CHM等。

✪ 3.6.1 非SpringBoot项目引入

https://redisson.org/

引入Redisson的依赖,然后配置对应的XML即可:

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.11.2</version>
  <exclusions>
    <exclusion>
      <artifactId>slf4j-api</artifactId>
      <groupId>org.slf4j</groupId>
    </exclusion>
  </exclusions>
</dependency>

编写相应的redisson.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:redisson="http://redisson.org/schema/redisson"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://redisson.org/schema/redisson
       http://redisson.org/schema/redisson/redisson.xsd
">
    <redisson:client>
        <redisson:single-server address="redis://127.0.0.1:6379"/>
    </redisson:client>
</beans>

配置对应@ImportResource("classpath*:redisson.xml")资源文件。

✪ 3.6.2 SpringBoot项目引入

或者直接使用springBoot的starter即可。

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.19.1</version>
</dependency>

修改application.properties即可:#spring.redis.host=

✪ 3.6.3 设置配置类

@Bean
public RedissonClient getRedissonClient() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    return Redisson.create(config);
}

✪ 3.6.4 使用

@Test
public void testRedissonLock() {
    RLock rLock = redisson.getLock("order");
    try {
        rLock.lock(30, TimeUnit.SECONDS);
        log.info("我获得了锁!!!");
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }finally {
        log.info("我释放了锁!!");
        rLock.unlock();
    }
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
存储 Dubbo Java
分布式 RPC 底层原理详解,看这篇就够了!
本文详解分布式RPC的底层原理与系统设计,大厂面试高频,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式 RPC 底层原理详解,看这篇就够了!
|
16天前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
48 4
|
4月前
|
存储 调度
分布式锁设计问题之云存储的最佳实践中保障分布式锁的容错能力如何解决
分布式锁设计问题之云存储的最佳实践中保障分布式锁的容错能力如何解决
|
1月前
|
NoSQL Java API
分布式锁的实现原理与应用场景,5 分钟彻底搞懂!
本文详细解析了分布式锁的实现原理与应用场景,包括线程锁、进程锁和分布式锁的区别,以及分布式锁的四种要求和三种实现方式(数据库乐观锁、ZooKeeper、Redis)。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式锁的实现原理与应用场景,5 分钟彻底搞懂!
|
5月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
5月前
|
NoSQL Java Redis
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
分布式锁实现原理问题之使用Redis的setNx命令来实现分布式锁问题如何解决
100 0
|
2月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
41 1
|
2月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
53 1
|
2月前
|
存储 缓存 数据处理
深度解析:Hologres分布式存储引擎设计原理及其优化策略
【10月更文挑战第9天】在大数据时代,数据的规模和复杂性不断增加,这对数据库系统提出了更高的要求。传统的单机数据库难以应对海量数据处理的需求,而分布式数据库通过水平扩展提供了更好的解决方案。阿里云推出的Hologres是一个实时交互式分析服务,它结合了OLAP(在线分析处理)与OLTP(在线事务处理)的优势,能够在大规模数据集上提供低延迟的数据查询能力。本文将深入探讨Hologres分布式存储引擎的设计原理,并介绍一些关键的优化策略。
140 0
|
3月前
|
网络协议 安全 Java
分布式(基础)-RMI的原理
分布式(基础)-RMI的原理