序言
前面的文章都是理论知识,写多了头有点大,突然想写点实战方面的内容,刚好最近公司在做异步任务迁移,用到了分布式锁和任务分片,所以打算写2篇实战方面的文章,分别介绍分布式锁和任务分片的实现方式,这个在实际项目中,应该会经常用到,今天这篇文章就先讲解分布式锁的实现方式。
使用场景
分布式锁的使用场景其实很多,在小米这边我主要遇到以下场景:
- 在服务集群中执行定时任务,我们希望只有一台机器去执行,就需要用到分布式锁,只有拿到锁的机器,才能执行该定时任务;
- 当外部请求打到集群中时,比如该请求是对订单进行操作,为了避免请求重入,我们需要在入口加上订单维度的分布式锁。
Redis分布式锁
Redis分布式锁是面试常面的考题,很多同学都知道用SetNx()去获取锁,如果面试官问你下面2个问题,你知道怎么回答么?
- 如果获取锁的机器挂掉,如何处理?
- 当锁超时时,A/B两台机器同时获取锁,可能会同时获取,如何解决?
其实Redis分布式锁,肯定不仅仅是SetNx()就能解决的,什么?你不知道什么是SetNx(),楼哥是暖男嘛,马上给你解答:
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。(返回值:设置成功,返回 1,设置失败,返回 0)
如果调用SetNx()返回1,表示获取到锁,如果返回0,表示没有获取到锁,为了避免机器宕机&重启,导致锁一直没有释放,所以我们需要记录锁的超时时间,整体执行流程如下:
- 先通过SetNx()获取锁,并将value设置成超时时间,如果成功获取锁,直接返回;
- 如果未获取到锁,可能是机器宕机&重启等,需要通过GetKey()获取锁的超时时间value,如果锁未超时,证明机器未宕机&重启,获取锁失败;
- 如果锁已经超时,就可以重新去获取锁,并设置锁的新的超时时间,为了避免多台机器机器同时拿到锁,需要使用GetSet()方法,因为GetSet()会返回之前的旧值,如果此时有两台机器A/B同时执行GetSet()方法,假如A先执行,B后执行,那么A调用GetSet()返回的值,其实就等于之前调用GetKey()获取的的值current_time,B调用GetKey()返回的值,其实就是A设置的新值,肯定不等于current_time,所以我们可以通过两个时间是否相等,来判断是谁先拿到锁。(这里应该是分布式锁最难理解的地方,我每次重温这个逻辑,都会在这个地方卡一下。。。)
Redis Getset 命令用于设置指定 key 的值,并返回 key 的旧值。(返回值:返回给定 key 的旧值。当 key 没有旧值时,即 key 不存在时,返回 nil;当 key 存在但不是字符串类型时,返回一个错误。)
可能有同学说,写了一堆,看的我头都大了,来来来,楼哥给你画了一幅图,是不是就清晰很多
具体实现
基本原理讲清楚了,下面就开始堆代码了哈,先看看获取锁的逻辑,里面的注释写的相当详细,即使不懂编程的同学,应该都能看懂:
// 获取分布式锁,需要考虑以下情况: // 1. 机器A获取到锁,但是在未释放锁之前,机器挂掉或者重启,会导致其它机器全部hang住,这时需要根据锁的超时时间,判断该锁是否需要重置; // 2. 当锁超时时,需要考虑两台机器同时去获取该锁,需要通过GETSET方法,让先执行该方法的机器获取锁,另外一台继续等待。 func GetDistributeLock(key string, expireTime int64) bool { currentTime := time.Now().Unix() expires := currentTime + expireTime redisAlias := "jointly" // 1.获取锁,并将value值设置为锁的超时时间 redisRet, err := redis.SetNx(redisAlias, key, expires) if nil == err && utils.MustInt64(1) == redisRet { // 成功获取到锁 return true } // 2.当获取到锁的机器突然重启&挂掉时,就需要判断锁的超时时间,如果锁超时,新的机器可以重新获取锁 // 2.1 获取锁的超时时间 currentLockTime, err := redis.GetKey(redisAlias, key) if err != nil { return false } // 2.2 当"锁的超时时间"大于等于"当前时间",证明锁未超时,直接返回 if utils.MustInt64(currentLockTime) >= currentTime { return false } // 2.3 将最新的超时时间,更新到锁的value值,并返回旧的锁的超时时间 oldLockTime, err := redis.GetSet(redisAlias, key, expires) if err != nil { return false } // 2.4 当锁的两个"旧的超时时间"相等时,证明之前没有其它机器进行GetSet操作,成功获取锁 // 说明:这里存在并发情况,如果有A和B同时竞争,A会先GetSet,当B再去GetSet时,oldLockTime就等于A设置的超时时间 if utils.MustString(oldLockTime) == currentLockTime { return true } return false }
对于里面的一些函数utils.MustString()、utils.MustInt64(),其实就是一些底层封装好的类型转换函数,应该不会影响大家理解哈,如果想直接拿去使用,这里需要简单修改一下。
再看看删除锁的逻辑:
// 删除分布式锁 // @return bool true-删除成功;false-删除失败 func DelDistributeLock(key string) bool { redisAlias := "jointly" redisRet := redis.Del(redisAlias, key) if redisRet != nil { return false } return true }
然后是业务处理逻辑:
func DoProcess(processId int) { fmt.Printf("启动第%d个线程\n", processId) redisKey := "redis_lock_key" for { // 获取分布式锁 isGetLock := GetDistributeLock(redisKey, 10) if isGetLock { fmt.Printf("Get Redis Key Success, id:%d\n", processId) time.Sleep(time.Second * 3) // 删除分布式锁 DelDistributeLock(redisKey) } else { // 如果未获取到该锁,为了避免redis负载过高,先睡一会 time.Sleep(time.Second * 1) } } }
最后起个10个多线程,去执行这个DoProcess():
func main() { // 初始化资源 var group string = "i18n" var name string = "jointly_shop" var host string // 初始化资源 host = "http://ip:port" _, err := xrpc.NewXRpcDefault(group, name, host) if err != nil { panic(fmt.Sprintf("initRpc when init rpc failed, err:%v", err)) } redis.SetRedis("jointly", "redis_jointly") // 开启10个线程,去抢Redis分布式锁 for i := 0; i <= 9; i ++ { go DoProcess(i) } // 避免子线程退出,主线程睡一会 time.Sleep(time.Second * 100) return }
程序跑了100s,我们可以看到,每次都只有1个线程获取到锁,分别是2、1、5、9、3,执行结果如下:
启动第0个线程 启动第6个线程 启动第9个线程 启动第4个线程 启动第5个线程 启动第2个线程 启动第1个线程 启动第8个线程 启动第7个线程 启动第3个线程 Get Redis Key Success, id:2 Get Redis Key Success, id:2 Get Redis Key Success, id:1 Get Redis Key Success, id:5 Get Redis Key Success, id:5 Get Redis Key Success, id:5 Get Redis Key Success, id:5 Get Redis Key Success, id:5 Get Redis Key Success, id:5 Get Redis Key Success, id:5 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:9 Get Redis Key Success, id:3 Get Redis Key Success, id:3 Get Redis Key Success, id:3 Get Redis Key Success, id:3 Get Redis Key Success, id:3
遇到的坑
中间出现过一些坑,我简单说一下:
- 之前我们做过一次服务迁移,需要将物理机迁移到Neo云,当把流量从物理机迁移Neo云后,千万不要忘了停掉物理机上的定时任务,否则物理机会去抢占这个分布式锁,特别是代码有变更后,物理机如果抢到锁,会继续执行旧的代码,那就是个大坑了。
- 不要轻易去修改分布式锁的超时时间,之前为了能快速排查问题,修改过一次,然后出现了非常诡异的问题,当时排查了一天,具体问题也记不太清了,大家感兴趣,可以自己模拟一下。
后记
这个分布式锁其实是我2019年写的,已经在线上跑了2年,只需要进行简单修改,就可以放到线上跑,不用担心里面有坑哈,因为坑已经被我趟过了。
上周写了一篇限流的文章,加上今天这个分布式锁,其实都是最近项目中使用的,所以就整理一下,其实我最想写的,是任务分片的实现方式,也是最近在公司做异步任务时Get到的新技能,它支持多机并发执行一个任务,是不是很神奇,后面会分享给大家。