手把手教你实现基于Redis的分布式锁

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介:

手把手教你实现基于Redis的分布式锁

  1. 概述
    目前,分布式系统已经是各大公司的标配,它具有高可用、可扩展等特点。在分布式系统中,由于存在多台机器上的进程竞争同一份资源的问题,因此需要分布式锁来保证同步访问资源。

一个经典的场景就是淘宝双11秒杀活动,全国人民的客户端访问不同的后端服务器,然后后端服务器再访问数据库,此时数据库就是需要同步访问的资源。

在介绍基于Redis实现的分布式锁之前;以Python语言为例,我们看看根据应用的实现架构,同步锁可能会有以下几种类型:

如果处理程序是单进程多线程的,在Python语言中,就可以使用 threading 模块的 Lock 对象来限制对共享资源的同步访问,实现多线程安全。
单机多进程的情况,在Python语言中,可以使用 multiprocessing 的 Lock 对象来保证多进程安全。
多机多进程部署的情况,需要依赖一个第三方组件(存储锁对象)来实现一个分布式的同步锁。

  1. 分布式锁的必要条件
    本文主要介绍第三种场景下基于Redis如何实现分布式锁。现在我们来看看实现一个分布式锁的必要条件有哪些?

原子性:加锁和释放锁的操作必须满足原子性
无死锁:不会发生死锁(PS:例如已获得锁的线程/进程在释放锁之前突然异常退出,导致其他线程/进程会一直在循环等待锁被释放)
互斥性:同一个时刻只能有一个线程/进程占有锁,其他线程/进程必须等待直到锁被释放
可重入性:当前线程/进程获得锁之后,还可以继续调用获取锁的操作,第二次以及之后的获取锁的操作不会被阻塞等待(PS:释放锁的操作也是一样的,调用多次之后,只有最后一次释放锁的时候才会真正地释放锁)--- 这个条件根据业务来决定是否需要实现

  1. 实现过程
    根据分布式锁的必要条件,下面将给出几种实现方式,来观察任意一个条件不满足时,会出现什么样的问题?在实现的过程中将使用同一份测试用例。测试用例代码如下:

test.py

'''
启用多个线程对 redis 中的 test_key 的值进行自增操作,理想情况,test_key 的值应该等于线程的数量,比如开了 10 个线程,test_key的值最终应该是10。
'''
def increase(redis, lock, key):

# 获得锁
lock_value = lock.get_lock(key)
value = redis.get(key)
# 模拟实际情况下进行的某些耗时操作
time.sleep(0.1)
value += 1
redis.set(key, value)
thread_name = threading.current_thread().name
# 打印线程名和最新的值
print thread_name, new_value
# 释放锁
lock.del_lock(key, lock_value)

连接服务端

redis = RedisCli(REDIS_CACHE_HOST_LIST, REDIS_CACHE_MASTER_NAME)
lock = RedisLock(redis)
key = 'test_key'
thread_count = 10
redis.delete(key)
for i in xrange(thread_count):

thread = threading.Thread(target=increase, args=(redis, lock, key))
thread.start()

Tips:
下面的代码片段中只展示需要修改的部分,其他部分和test.py保持一致。

3.1 原子性
在这个版本中,当线程 A get(lock_key) 的值为空时,set lock_key 的值为 1,并返回,这表示线程 A 获得了锁,可以继续执行后面的操作,否则需要一直循环去获取锁,直到 key 的值再次为空,重新获得锁,执行任务完成后释放锁。

class RedisLock(object):

def __init__(self, rediscli):
    self.rediscli = rediscli

def _get_lock_key(self, key):
    lock_key = "lock_%s" % key
    return lock_key

def get_lock(self, key):
    lock_key = self._get_lock_key(key)
    while True:
        value = self.rediscli.get(lock_key)
        if not value:
            self.rediscli.set(lock_key, '1')
            return True
        time.sleep(0.01)

def del_lock(self, key, new_expire_time):
    lock_key = self._get_lock_key(key)
    return self.rediscli.delete(lock_key)

执行test.py测试脚本,得到的结果如下:

Thread-1 1
Thread-5 2
Thread-2 2
Thread-6 3
Thread-7 3
Thread-4 3
Thread-9 4
Thread-8 5
Thread-10 5
Thread-3 5
观察输出结果发现,同时有多个线程输出的结果是一样的。初看上面加锁的代码逻辑似乎没什么问题,但是最终的结果却事与愿违,原因是上面的代码get(lock_key)和set(lock_key, '1')并不是原子性的执行,而是分开执行。A 线程在get(lock_key)的时候发现是空值,于是重新set(lock_key, '1'),但在get操作之后,set操作之前,B 线程恰好执行了get(lock_key),此时B 线程的get操作得到的还是空值,然后也顺利获得锁,导致数据被两个或多个线程同时修改,最后出现不一致。

3.2 无死锁
由于3.1的版本是因为get_lock方法不是原子性操作,造成两个或多个线程同时获得锁的问题,这个版本改成使用 redis 的 setnx 命令来进行锁的查询和设置操作,setnx 即 set if not exists,顾名思义就是当key不存在的时候才设置 value,并返回 1,如果 key 已经存在,则不进行任何操作,返回 0。

只展示需要修改的部分,其他部分还是和3.1的代码一样

def get_lock(self, key):

lock_key = self._get_lock_key(key)
thread_name = threading.current_thread().name
while True:
    value = self.rediscli.setnx(lock_key, 1)
    if value:
        return True
    time.sleep(0.01)
    print "{} waiting...".format(thread_name)

执行test.py测试脚本,得到的结果如下:

Thread-1 1
Thread-4 2
Thread-2 3
Thread-3 4
Thread-7 5
Thread-6 6
Thread-5 7
Thread-8 8
Thread-9 9
Thread-10 10
输出结果是正确的,但是还有潜在的问题。比如假设 A 线程获得了锁后,由于某种异常原因导致线程crash了,这个时候锁将无法被释放。稍微修改一下测试用例的 increase 函数,模拟某个线程在释放锁之前因为异常退出。

test-3-2.py

def increase(redis, lock, key):

thread_name = threading.current_thread().name
lock_value = lock.get_lock(key)
value = redis.get(key)
if not value:
    value = 0
# 模拟实际情况下进行的某些耗时操作
time.sleep(0.1)
value = int(value) + 1
redis.set(key, value)
print thread_name, value
# 模拟线程2异常退出
if thread_name == 'Thread-2':
    print '{} crash...'.format(thread_name)
    import sys
    sys.exit(1)
lock.del_lock(key, lock_value)

执行test-3-2.py测试脚本,得到的结果如下:

Thread-2 3
Thread-2 crash...
Thread-7 waiting...
Thread-3 waiting...
Thread-5 waiting...
Thread-4 waiting...
Thread-9 waiting...
Thread-6 waiting...
Thread-10 waiting...
此时就会出现问题,当线程2 crash 之后,后续获取锁的线程一直获取不了锁,一直处于等待锁的状态,于是产生了死锁。如果请求是多线程处理的,比如每来一个请求就开一个线程去处理,那么堆积的线程会逐渐增多,最终可能会导致系统崩溃。

当获得锁的线程异常退出后,无法主动释放锁,因此需要找到一种方式即使线程异常退出,线程占用的锁也能够被释放,显然我们需要一种被动释放锁的机制。从 redis 2.6.12 版本开始,set 命令就已经支持了 nx 和 expire 功能。改进代码如下:

def get_lock(self, key, timeout=3):

lock_key = self._get_lock_key(key)
while True:
    value = self.rediscli.set(lock_key, '1', nx=True, ex=timeout)
    if value:
        return True
    time.sleep(0.01)

执行test.py测试脚本,得到的结果如下:

Thread-1 1
Thread-9 2
Thread-6 3
Thread-2 4
Thread-4 5
Thread-5 6
Thread-8 7
Thread-3 8
Thread-7 9
Thread-10 10
执行test-3-2.py测试脚本,模拟 线程2 crash,得到的结果如下:

Thread-1 1
Thread-2 2
Thread-2 crash...
Thread-10 3
Thread-7 4
Thread-4 5
Thread-8 6
Thread-3 7
Thread-9 8
Thread-6 9
Thread-5 10
从上面的运行结果来看,似乎已经解决了原子性和无死锁的问题。那第三个条件互斥性是否满足呢?正常情况下,3.2节的实现方式是满足互斥性的,但是还有一种场景需要我们考虑:比如假设 A 线程的逻辑还没处理完,但是锁由于过期时间到了,导致锁自动被释放掉,这时 B 线程获得了锁,开始处理 B 的逻辑,然后 A 进程的逻辑处理完了,B 线程还在处理中,就把 B 线程的锁给删除了。通过修改一下测试用例,模拟一下这种场景。

def increase(redis, lock, key):

thread_name = threading.current_thread().name
# 设置锁的过期时间为2s
lock_value = lock.get_lock(key, thread_name, timeout=2)
value = redis.get(key)
if not value:
    value = 0
# 模拟实际情况下进行的某些耗时操作, 且执行时间大于锁过期的时间
time.sleep(2.5)
value = int(value) + 1
print thread_name, value
redis.set(key, value)
lock.del_lock(key, lock_value)

我们让线程的执行时间大于锁的过期时间,导致锁到期自动释放。执行上面的测试脚本,得到的结果如下:

Thread-1 1
Thread-3 1
Thread-2 2
Thread-9 2
Thread-5 3
Thread-7 3
Thread-6 4
Thread-4 4
Thread-8 5
Thread-10 5
既然这种现象是由于锁过期导致误删其他线程的锁引发的,那我们就顺着这个思路,强制线程只能删除自己设置的锁。如果是这样,就需要为每个线程的锁添加一个唯一标识。在我们的分布式锁实现机制中,我们每次添加锁的时候,都是给 lock_key 设为 1,无论是 key 还是 value,都不具备唯一性,如果把 key 设为唯一的,那么在分布式系统中需要产生 N (等于总线程数)个 key 了 ,从直观性和维护性上来说,这都是不可取的。因此只能将 value 设置为每个线程的唯一标识。这个唯一标识由线程 ID + 进程的 PID + 机器的 IP + 时间戳 + 集群名称组成,这样就构成了一个线程锁的唯一标识。

3.3 互斥性
根据上一节最后的分析,我们设计出了基于Redis实现分布式锁的最终版。

最终版

class RedisLock(object):

def __init__(self, rediscli):
    self.rediscli = rediscli.master
    # ip 在实例化的时候就获取,避免过多访问DNS
    self.ip = socket.gethostbyname(socket.gethostname())
    self.pid = os.getpid()
    self.cluster = "hna"

def _gen_lock_key(self, key):
    lock_key = "lock_%s" % key
    return lock_key

def _gen_unique_value(self):
    thread_name = threading.current_thread().name
    time_now = time.time()
    unique_value = "{0}-{1}-{2}-{3}-{4}".format(self.ip, self.pid, thread_name, self.cluster, time_now)
    return unique_value

def get_lock(self, key, timeout=3):
    lock_key = self._gen_lock_key(key)
    unique_value = self._gen_unique_value()
    logger.info("unique value %s" % unique_value)
    while True:
        value = self.rediscli.set(lock_key, unique_value, nx=True, ex=timeout)
        if value:
            # 注意,我们返回了唯一标识,用于后面的delete时检查是否是当前线程的锁
            return unique_value
        # 进入阻塞状态,避免一直消耗CPU
        time.sleep(0.1)

def del_lock(self, key, value):
    lock_key = self._gen_lock_key(key)
    old_value = self.rediscli.get(lock_key)
    # 检查是否是当前线程持有的锁
    if old_value == value:
        return self.rediscli.delete(lock_key)

执行test.py测试脚本,得到的结果如下:

Thread-1 1
Thread-2 2
Thread-4 3
Thread-5 4
Thread-10 5
Thread-3 6
Thread-9 7
Thread-6 8
Thread-8 9
Thread-7 10
修改test.py测试脚本,测试一下锁过期。测试脚本如下:

test-3-3.py

def increase(redis, lock, key):

thread_name = threading.current_thread().name
lock_value = lock.get_lock(key, timeout=1)
value = redis.get(key)
if not value:
    value = 0
# 模拟实际情况下进行的某些耗时操作, 且执行时间大于锁过期的时间
time.sleep(3)
value = int(value) + 1
print thread_name, value
redis.set(key, value)
lock.del_lock(key, lock_value)

执行test-3-3.py测试脚本,得到的结果如下:

Thread-1 1
Thread-2 1
Thread-5 1
Thread-6 2
Thread-8 2
Thread-10 2
Thread-9 3
Thread-3 3
Thread-4 3
Thread-7 4
从运行test-3-3.py测试脚本结果来看,问题没有得到解决。这是为什么呢?因为我们设置value的唯一性只能确保线程不会误删其他线程产生的锁,不会出现一连串的误删锁的情况,比如 A 删了 B 的锁,B 执行完删了 C 的锁。使用 redis 的过期机制,只要业务的处理时间大于锁的过期时间,就没有一个很好的方式来避免由于锁过期导致其他线程同时占有锁的问题,所以需要熟悉业务的执行时间,来合理地设置锁的过期时间。(PS:对于这种情况,一般的处理方式是获得锁的线程开启一个守护线程,用来给快要过期的锁"续航"。比如过去了29秒,线程A还没执行完,这时候守护线程会执行expire指令,为这把锁"续航"20秒。守护线程从第29秒开始执行,每20秒执行一次检查。当线程A执行完任务,会显式关掉守护线程。线程A的进程或者守护进程异常退出,这把锁将自动超时释放,从而不会导致死锁。)

另外,需要注意的一点是:3.3节的实现方式中,删除锁(del_lock)的操作不是原子性的,先是拿到锁,再判断锁的值是否相等,相等的话最后再删除锁,既然不是原子性的,就有可能存在这样一种极端情况:在判断的那一时刻,锁正好过期了,被其他线程占有了锁,那最后一步的删除,就可能会造成误删其他线程的锁。因此推荐使用官方提供的 Lua 脚本来确保原子性:

def del_lock(self, key, value):

if redis.call("get",key) == value then
    return redis.call("del",key)
else
    return 0
  1. 总结
    以上就是我们使用 Redis 来实现一个分布式同步锁的方式,其特点是:

加锁和释放锁是原子性的
满足互斥性,同一个时刻只能有一个线程可以获取锁和释放锁
利用 Redis 的 ttl机制和守护进程的方式来保证不会出现死锁
以上的方案中,我们是假设 Redis 服务端是单集群且高可用的,忽视了以下的问题:

如果某一时刻 Redis master 节点发生了故障,集群中的某个 slave 节点变成 master 节点,在故障迁移(failover)过程中可能出现原 master 节点上的锁没有及时同步到 slave 节点,导致其他线程同时获得锁。对于这个问题,可以参考 Redis 官方推出的 redlock 算法,但是比较遗憾的是,该算法也没有很好地解决锁过期的问题。(PS:不过这种不安全也仅仅是在主从发生 failover 的情况下才会产生,而且持续时间极短,业务系统多数情况下可以容忍。)

  1. 参考资料
    漫画:什么是分布式锁?

基于 redis 的分布式锁实现
redis分布式锁深度剖析(超时情况)
SET key value
Distributed locks with Redis
原文地址https://www.cnblogs.com/wengle520/p/12484931.html

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
15天前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
22天前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
40 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
15天前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
35 1
|
19天前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
53 4
|
23天前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
41 4
|
23天前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
42 3
|
25天前
|
存储 NoSQL 关系型数据库
【redis】认识redis和分布式系统
【redis】认识redis和分布式系统
22 1
|
20天前
|
缓存 NoSQL 算法
面试题:Redis如何实现分布式锁!
面试题:Redis如何实现分布式锁!
|
NoSQL Java 关系型数据库
浅谈Redis实现分布式锁
浅谈Redis实现分布式锁
|
NoSQL Redis 数据库
用redis实现分布式锁时容易踩的5个坑
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 近有不少小伙伴投入短视频赛道,也出现不少第三方数据商,为大家提供抖音爬虫数据。 小伙伴们有没有好奇过,这些数据是如何获取的,普通技术小白能否也拥有自己的抖音爬虫呢? 本文会全面解密抖音爬虫的幕后原理,不需要任何编程知识,还请耐心阅读。
用redis实现分布式锁时容易踩的5个坑