如何用Redis实现分布式锁

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 前面的文章都是理论知识,写多了头有点大,突然想写点实战方面的内容,刚好最近公司在做异步任务迁移,用到了分布式锁和任务分片,所以打算写2篇实战方面的文章,分别介绍分布式锁和任务分片的实现方式,这个在实际项目中,应该会经常用到,今天这篇文章就先讲解分布式锁的实现方式。

序言


前面的文章都是理论知识,写多了头有点大,突然想写点实战方面的内容,刚好最近公司在做异步任务迁移,用到了分布式锁和任务分片,所以打算写2篇实战方面的文章,分别介绍分布式锁和任务分片的实现方式,这个在实际项目中,应该会经常用到,今天这篇文章就先讲解分布式锁的实现方式。


使用场景


分布式锁的使用场景其实很多,在小米这边我主要遇到以下场景:

  • 在服务集群中执行定时任务,我们希望只有一台机器去执行,就需要用到分布式锁,只有拿到锁的机器,才能执行该定时任务;
  • 当外部请求打到集群中时,比如该请求是对订单进行操作,为了避免请求重入,我们需要在入口加上订单维度的分布式锁。


Redis分布式锁


Redis分布式锁是面试常面的考题,很多同学都知道用SetNx()去获取锁,如果面试官问你下面2个问题,你知道怎么回答么?

  • 如果获取锁的机器挂掉,如何处理?
  • 当锁超时时,A/B两台机器同时获取锁,可能会同时获取,如何解决?

其实Redis分布式锁,肯定不仅仅是SetNx()就能解决的,什么?你不知道什么是SetNx(),楼哥是暖男嘛image.gif,马上给你解答:

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。(返回值:设置成功,返回 1,设置失败,返回 0)

如果调用SetNx()返回1,表示获取到锁,如果返回0,表示没有获取到锁,为了避免机器宕机&重启,导致锁一直没有释放,所以我们需要记录锁的超时时间,整体执行流程如下:

  • 先通过SetNx()获取锁,并将value设置成超时时间,如果成功获取锁,直接返回;
  • 如果未获取到锁,可能是机器宕机&重启等,需要通过GetKey()获取锁的超时时间value,如果锁未超时,证明机器未宕机&重启,获取锁失败;
  • 如果锁已经超时,就可以重新去获取锁,并设置锁的新的超时时间,为了避免多台机器机器同时拿到锁,需要使用GetSet()方法,因为GetSet()会返回之前的旧值,如果此时有两台机器A/B同时执行GetSet()方法,假如A先执行,B后执行,那么A调用GetSet()返回的值,其实就等于之前调用GetKey()获取的的值current_time,B调用GetKey()返回的值,其实就是A设置的新值,肯定不等于current_time,所以我们可以通过两个时间是否相等,来判断是谁先拿到锁。(这里应该是分布式锁最难理解的地方,我每次重温这个逻辑,都会在这个地方卡一下。。。)

Redis Getset 命令用于设置指定 key 的值,并返回 key 的旧值。(返回值:返回给定 key 的旧值。当 key 没有旧值时,即 key 不存在时,返回 nil;当 key 存在但不是字符串类型时,返回一个错误。)

可能有同学说,写了一堆,看的我头都大了,来来来,楼哥给你画了一幅图,是不是就清晰很多image.gif

image.gif

具体实现


基本原理讲清楚了,下面就开始堆代码了哈,先看看获取锁的逻辑,里面的注释写的相当详细,即使不懂编程的同学,应该都能看懂:

// 获取分布式锁,需要考虑以下情况:
// 1. 机器A获取到锁,但是在未释放锁之前,机器挂掉或者重启,会导致其它机器全部hang住,这时需要根据锁的超时时间,判断该锁是否需要重置;
// 2. 当锁超时时,需要考虑两台机器同时去获取该锁,需要通过GETSET方法,让先执行该方法的机器获取锁,另外一台继续等待。
func GetDistributeLock(key string, expireTime int64) bool {
 currentTime := time.Now().Unix()
 expires := currentTime + expireTime
 redisAlias := "jointly"
 // 1.获取锁,并将value值设置为锁的超时时间
 redisRet, err := redis.SetNx(redisAlias, key, expires)
 if nil == err && utils.MustInt64(1) == redisRet {
  // 成功获取到锁
  return true
 }
 // 2.当获取到锁的机器突然重启&挂掉时,就需要判断锁的超时时间,如果锁超时,新的机器可以重新获取锁
 // 2.1 获取锁的超时时间
 currentLockTime, err := redis.GetKey(redisAlias, key)
 if err != nil {
  return false
 }
 // 2.2 当"锁的超时时间"大于等于"当前时间",证明锁未超时,直接返回
 if utils.MustInt64(currentLockTime) >= currentTime {
  return false
 }
 // 2.3 将最新的超时时间,更新到锁的value值,并返回旧的锁的超时时间
 oldLockTime, err := redis.GetSet(redisAlias, key, expires)
 if err != nil {
  return false
 }
 // 2.4 当锁的两个"旧的超时时间"相等时,证明之前没有其它机器进行GetSet操作,成功获取锁
 // 说明:这里存在并发情况,如果有A和B同时竞争,A会先GetSet,当B再去GetSet时,oldLockTime就等于A设置的超时时间
 if utils.MustString(oldLockTime) == currentLockTime {
  return true
 }
 return false
}

对于里面的一些函数utils.MustString()、utils.MustInt64(),其实就是一些底层封装好的类型转换函数,应该不会影响大家理解哈,如果想直接拿去使用,这里需要简单修改一下。


再看看删除锁的逻辑:

// 删除分布式锁
// @return bool true-删除成功;false-删除失败
func DelDistributeLock(key string) bool {
 redisAlias := "jointly"
 redisRet := redis.Del(redisAlias, key)
 if redisRet != nil {
  return false
 }
 return true
}


然后是业务处理逻辑:

func DoProcess(processId int) {
 fmt.Printf("启动第%d个线程\n", processId)
 redisKey := "redis_lock_key"
 for {
  // 获取分布式锁
  isGetLock := GetDistributeLock(redisKey, 10)
  if isGetLock {
   fmt.Printf("Get Redis Key Success, id:%d\n", processId)
   time.Sleep(time.Second * 3)
   // 删除分布式锁
   DelDistributeLock(redisKey)
  } else {
   // 如果未获取到该锁,为了避免redis负载过高,先睡一会
   time.Sleep(time.Second * 1)
  }
 }
}


最后起个10个多线程,去执行这个DoProcess():

func main() {
 // 初始化资源
 var group string = "i18n"
 var name string = "jointly_shop"
 var host string
 // 初始化资源
 host = "http://ip:port"
 _, err := xrpc.NewXRpcDefault(group, name, host)
 if err != nil {
  panic(fmt.Sprintf("initRpc when init rpc  failed, err:%v", err))
 }
 redis.SetRedis("jointly", "redis_jointly")
 // 开启10个线程,去抢Redis分布式锁
 for i := 0; i <= 9; i ++ {
  go DoProcess(i)
 }
 // 避免子线程退出,主线程睡一会
 time.Sleep(time.Second * 100)
 return
}


程序跑了100s,我们可以看到,每次都只有1个线程获取到锁,分别是2、1、5、9、3,执行结果如下:

启动第0个线程
启动第6个线程
启动第9个线程
启动第4个线程
启动第5个线程
启动第2个线程
启动第1个线程
启动第8个线程
启动第7个线程
启动第3个线程
Get Redis Key Success, id:2
Get Redis Key Success, id:2
Get Redis Key Success, id:1
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3


遇到的坑


中间出现过一些坑,我简单说一下:

  • 之前我们做过一次服务迁移,需要将物理机迁移到Neo云,当把流量从物理机迁移Neo云后,千万不要忘了停掉物理机上的定时任务,否则物理机会去抢占这个分布式锁,特别是代码有变更后,物理机如果抢到锁,会继续执行旧的代码,那就是个大坑了。
  • 不要轻易去修改分布式锁的超时时间,之前为了能快速排查问题,修改过一次,然后出现了非常诡异的问题,当时排查了一天,具体问题也记不太清了,大家感兴趣,可以自己模拟一下。


后记


这个分布式锁其实是我2019年写的,已经在线上跑了2年,只需要进行简单修改,就可以放到线上跑,不用担心里面有坑哈,因为坑已经被我趟过了。

上周写了一篇限流的文章,加上今天这个分布式锁,其实都是最近项目中使用的,所以就整理一下,其实我最想写的,是任务分片的实现方式,也是最近在公司做异步任务时Get到的新技能,它支持多机并发执行一个任务,是不是很神奇,后面会分享给大家。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
30天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
96 5
|
2月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
69 8
|
2月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
61 16
|
2月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
44 5
|
3月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
55 1
|
3月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
88 4
|
3月前
|
缓存 NoSQL 算法
面试题:Redis如何实现分布式锁!
面试题:Redis如何实现分布式锁!
|
5月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
136 2
基于Redis的高可用分布式锁——RedLock
|
5月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】