zookeeper实现分布式锁原理

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: zookeeper实现分布式锁原理

zookeeper典型使用场景实战

1,zookeeper实现分布式锁

1,之前分析过一篇redis分布式锁实现的原理,今天谈谈zookeeper实现分布式锁的原理。由于每个结点都具有独占性,因此可以通过zookeeper的监听机制,来保证这个锁被谁占有。

zookeeper实现分布式锁的流程如下

大致流程如下:就是说可以对一个结点进行加锁,当其他结点要来加锁时就判断这个结点是否被其他事务创建,如果没有被创建,那么就可以创建当前事务,并且获取当前锁;如果被创建,那么要来获取锁的这个结点可以对这个拥有锁的结点进行监听,当当前锁被释放,就是结点被删除的时候,那么其他结点就可以通过这个监听机制,获取到锁被释放的消息,那么就可以来竞争这把锁了。因此这把锁也是一把非公平锁。

如上实现方式在并发问题比较严重的情况下,性能会下降的比较厉害,主要原因是,所有的连接都在对同一个节点进行监听,当服务器检测到删除事件时,要通知所有的连接,所有的连接同时收到事件,再次并发竞争,这就是羊群效应,会大大的降低系统的效率。

1.1,公平锁实现

1,请求进来,直接创建一个顺序的临时结点。

2,判断当前结点是否最小结点,如0最小,其次是1,2,3,4…,如果是最小的,那么就获取锁,如果不是最小的,那么就会对前面的结点进行监听,如3监听2,2监听1,依次类推。

3,处理完释放锁,由于1监听0,因此1最先收到锁释放的通知,从而实现这种顺序性,公平锁。

借助于这种临时顺序结点,可以避免同时多个结点的并发竞争锁,从而缓解服务端压力。

2,代码方式实现

可以模拟一个减库存的操作

@Transactional
public void reduceStock(Integer id){
    // 1.获取库存
    Product product = productMapper.getProduct(id);
    // 模拟耗时业务处理
    sleep( 500); // 其他业务处理
    if (product.getStock() <=0 ) {
        throw new RuntimeException("out of stock");
    }
    // 2.减库存
    int i = productMapper.deductStock(id);
    if (i==1){
        Order order = new Order();
        order.setUserId(UUID.randomUUID().toString());
        order.setPid(id);
        orderMapper.insert(order);
    }else{
        throw new RuntimeException("deduct stock fail, retry.");
    }
}
 /**
  * 模拟耗时业务处理
  * @param wait
  */
public void sleep(long  wait){
    try {
        TimeUnit.MILLISECONDS.sleep( wait );
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

当然如果存在多个jvm进程的话,那么一定会出现超卖的问题,这个就可以使用这个zookeeper的分布式锁来解决这个问题。因此接下来通过创建一个zookeeper的互斥锁来解决这个超卖问题,主要通过这个InterProcessMutex这个互斥锁实现。

@Autowired
private OrderService orderService;
@Autowired
CuratorFramework curatorFramework;
@PostMapping("/stock/deduct")
public Object reduceStock(Integer id) throws Exception {
  //互斥锁
    InterProcessMutex ipm = new InterProcessMutex(curatorFramework, "/product_" + id);
    try {
        // xxx业务逻辑
        //加锁
        interProcessMutex.acquire();
        //调用这个减库存的方法
        orderService.reduceStock(id);
    } catch (Exception e) {
        if (e instanceof RuntimeException) {
            throw e;
        }
    }finally {
        //释放锁
        interProcessMutex.release();
    }
    return "ok" ;
}

2.1,InterProcessMutex底层

/**
 * CuratorFramework:Curator客户端
 * path:结点
 * 主要是通过new这个StandardLockInternalsDriver类实现
*/
InterProcessMutexpublic InterProcessMutex(CuratorFramework client, String path) {
    this(client, path, new StandardLockInternalsDriver());
}

2.2,加锁的的具体细节

//加锁
interProcessMutex.acquire();

acquire方法的具体实现如下,

public void acquire() throws Exception {
    if (!this.internalLock(-1L, (TimeUnit)null)) {
        throw new IOException(this.basePath);
    }
}

通过这个internalLock方法来判断是否加过锁,如果没有加过锁就会调用attemptLock方法来进行加锁

private boolean internalLock(long time, TimeUnit unit) throws Exception {
    //获取当前线程
    Thread currentThread = Thread.currentThread();
    //InterProcessMutex互斥锁中判断是否已经加过锁
    InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
    //加过锁
    if (lockData != null) {
        //将数据进行加1的操作
        lockData.lockCount.incrementAndGet();
        return true;
    } 
    else {
        //进行一个加锁
        String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
        //判断是否加锁成功 
        if (lockPath != null) {
            InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
            this.threadData.put(currentThread, newLockData);
            return true;
        } else {
        return false;
        }
    }
}

加锁的方式具体如下,主要是在这个方法里面attemptLock(),具体流程如下,首先会去创建一个结点,然后会去判断当前结点是不是最小的结点,如果当前结点是最小的结点,那么就会获取锁,如果当前结点不是最小的结点,那么就会对前面的结点进行监听,内部会有一些排序等,监听等的操作,最后结点处理完之后会去释放锁。

 while(!isDone) {
    isDone = true;
  try {
        //创建结点
    ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
        //创建创建成功之后判断当前节点是不是最小的子结点,内部会进行这个排序的操作
        //最小的结点可以获取锁
    hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
    } catch (NoNodeException var14) {
        if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                throw var14;
            }
            isDone = false;
        }
    }
  return hasTheLock ? ourPath : null;

createsTheLock,创建锁的方式如下

public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        if (lockNodeBytes != null) {
            //通过容器结点实现,如果没有容器结点,则创建容器结点
            //如果有容器结点,那么就在容器节点中创建这个临时的顺序结点
            ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes); 
        } else {
            //临时的顺序节点
            ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);
        }
        return ourPath;
    }

internalLockLoop里面的方法如下,主要是用来判断当前节点是否最小结点,即使这个结点是一个有序的临时顺序结点,但是由于是需要通过网络获取,因此需要重新进行排序,从而判断当前结点是不是最小结点,如果不是,就会根据排好的顺序,放入到指定的位置,从而监听上一个临时顺序结点。

while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
    //获取全部的这个子结点
    List<String> children = this.getSortedChildren();
    String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
    PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
      if (predicateResults.getsTheLock()) {
          haveTheLock = true;
      } else {
          String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
           synchronized(this) {
               try {  ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                  if (millisToWait == null) {
                      this.wait();
                  } else {
                      millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                       startMillis = System.currentTimeMillis();
                       if (millisToWait > 0L) {
                           this.wait(millisToWait);
                       } else {
                           doDelete = true;
                           break;
                       }
                   }
               } catch (NoNodeException var19) {
                   ;
               }
           }
       }
   }

3,redis和zookeeper分布锁的区别

redis在使用主从复制时,master主节点挂掉之后,slave故障转移时生成master,会有可能造成因为原master挂掉的数据故障丢失的情况。同时redis使用rdb和aof两种方式做持久化,如果出现宕机的情况也可能会造成数据的丢失问题。在设计方面,只需要保证主节点中有数据就行,通过主从复制的方式来保证这个结点数据的一致性,并且主从复制是异步的,也就是说往master主节点中写入数据和slaver去同步这个master结点中的数据是异步的,两个步骤是各位各的,由于这种异步也会造成这个数据丢失的问题。因此更多保证的是这个ap协议,即保证了这个redis分布式锁的高可用


zookeeper使用的是leader-follower模式,需要多个结点同时写成功才算成功,因此可以保证多个结点里面都会有相同的数据,follower结点会及时的去同步主节点中的数据。因此在保障这个多个结点数据的一致性能,因此这个zookeeper更多的是保证这个cp协议,一致性。主从的一致性主要是通过ZAB(ZooKeeper Atomic Broadcast)协议实现的,即ZooKeeper原子播送协议。


4,zookeeper其他业务场景

如果只有读的情况,是不需要所有的结点都进行加锁的。

如果有读有些的话,可能遇到的问题:

1,读写并发不一致

2,读写不一致,即网络产生的延迟问题,在写完数据库之后,更新缓存出现网络问题,被别的线程修改该数据并更新缓存。

解决方案


1,通过共享锁实现,即读写锁实现。如果前面的结点是读锁,则直接获取锁,如果当前结点是一个写锁,则不能直接获取锁,需要对前面的结点进行监听的操作。

2,write写操作,和这个互斥锁的原理一样,需要对前面的结点进行这个监听操作。


5,总结

5.1,分布式锁原理总结

主要是利用zookeeper的临时顺序结点的特性,从而保障这个锁的公平性,从而解决并发上锁竞争的问题,并且结合这个watch监听的机制,后一个结点结点监听前一个结点,解决了zookeeper分布式锁的羊群效应,所谓羊群效应就是说,当一个节点挂掉后,所有结点都去监听,然后做出反应,这样会给服务器带来巨大压力。有了临时顺序节点以及节点监听机制,当一个节点挂掉,只有它后面的那一个节点才做出反应。并且通过这个ZAB协议,保证数据的一致性。


5.2,zookeeper分布式锁底层流程总结

0b5afbcd72314b26a3d68c26e92d2eba.png

通过上面的源码分析可知,首先会去新建一个container的容器结点,也可以创建一个持久化结点,如对面的 /lock,接下来创建这个顺序的临时结点,大小长度为10位。创建一个结点之后,内部会做一个排序,判断当前节点是不是最小结点,如果当前结点是最小结点,那么当前结点就可以获取到锁,如果不是最小结点,就会根据这个临时顺序结点的特性,将此结点加入到最后面进行排队,并且监听前面一个结点。一旦前面这个结点有了释放锁的消息,那么么当前结点就可以去获取锁了,获取锁之后,就开始处理具体的业务流程,处理完业务流程之后,再去释放锁。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
131 2
|
4天前
|
存储 分布式计算 Hadoop
Hadoop【基础知识 01】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 01】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)
103 3
|
4天前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
141 2
|
1天前
|
前端开发 JavaScript 算法
分布式系统的一致性级别划分及Zookeeper一致性级别分析
分布式系统的一致性级别划分及Zookeeper一致性级别分析
|
4天前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
4天前
|
存储 供应链 安全
区块链技术原理及应用:深入探索分布式账本技术
【4月更文挑战第30天】区块链,从加密货币的底层技术延伸至多元领域,以其分布式账本、去中心化、不可篡改性及加密技术重塑数据存储与交易。核心组件包括区块、链和节点,应用涵盖加密货币、供应链管理、金融服务等。尽管面临扩展性等挑战,未来潜力无限。
|
4天前
|
存储 NoSQL 分布式数据库
【Flink】Flink分布式快照的原理是什么?
【4月更文挑战第21天】【Flink】Flink分布式快照的原理是什么?
|
4天前
|
存储 运维 分布式计算
面经:HDFS分布式文件系统原理与故障排查
【4月更文挑战第10天】本文深入剖析了HDFS的底层原理和面试重点,包括HDFS的架构(NameNode、DataNode、Secondary NameNode)、文件读写流程、高级特性(快照、Erasure Coding、Federation、High Availability)以及故障排查方法。通过HDFS Shell命令示例,加强理解,并对比了HDFS与其他分布式文件系统的优缺点。掌握这些知识将有助于求职者在面试中脱颖而出,应对HDFS相关技术考察。
46 3
|
4天前
|
Java 网络安全 Apache
搭建Zookeeper集群:三台服务器,一场分布式之舞
搭建Zookeeper集群:三台服务器,一场分布式之舞
53 0
|
4天前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
170 4