纠正误区:这才是 SpringBoot Redis 分布式锁的正确实现方式

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 纠正误区:这才是 SpringBoot Redis 分布式锁的正确实现方式

我是码哥,可以叫我靓仔。

在说分布式锁之前,我们先说下为什么需要分布式锁。

在单机部署的时候,我们可以使用 Java 中提供的 JUC 锁机制避免多线程同时操作一个共享变量产生的安全问题。JUC 锁机制只能保证同一个 JVM 进程中的同一时刻只有一个线程操作共享资源。

一个应用部署多个节点,多个进程如果要修改同一个共享资源,为了避免操作乱序导致的并发安全问题,这个时候就需要引入分布式锁,分布式锁就是用来控制同一时刻,只有一个 JVM 进程中的一个线程可以访问被保护的资源。

分布式锁很重要,然而很多公司的系统可能还在跑着有缺陷的分布式锁方案,其中不乏一些大型公司。

所以,码哥今天分享一个正确 Redis 分布式锁代码实战,让你一飞冲天,该代码可直接用于生产,不是简单的 demo。

温馨提示:如果你只想看代码实战部分,可直接翻到 SpringBoot 实战章节。

错误的分布式锁

说正确方案之前,先来一个错误的,知道错在哪,才能意识到如何写正确。

在银行工作的小白老师,使用 Redis SET 指令实现加锁, 指令满足了当 key 不存在则设置 value,同时设置超时时间,并且满足原子语意。

SET lockKey 1 NX PX expireTime
  • lockKey 表示锁的资源,value 设置成 1。
  • NX:表示只有 lockKey 不存在的时候才能 SET 成功,从而保证只有一个客户端可以获得锁。
  • PX expireTime设置锁的超时时间,单位是毫秒;也可以使用 EX seconds以秒为单位设置超时时间。

至于解锁操作,小白老师果决的使用 DEL指令删除。一个分布式锁方案出来了,一气呵成,组员不明觉厉,纷纷竖起大拇指,伪代码如下。

//加锁成功
if(jedis.set(lockKey, 1, "NX", "EX", 10) == 1){
  try {
      do work //执行业务
  } finally {
    //释放锁
     jedis.del(key);
  }
}

然而,这是一个错误的分布式锁。问题在于解锁的操作有可能出现释放别人的锁的情况。

有可能出现释放别人的锁的情况。

  1. 客户端 A 获取锁成功,设置超时时间 10 秒。
  2. 客户端 A 执行业务逻辑,但是因为某些原因(网络问题、FullGC、代码垃圾性能差)执行很慢,时间超过 10 秒,锁因为超时自动释放了。
  3. 客户端 B 加锁成功。
  4. 客户端 A 执行 DEL 释放锁,相当于把客户端 B 的锁释放了。

原因很简单:客户端加锁时,没有设置一个唯一标识。释放锁的逻辑并不会检查这把锁的归属,直接删除。

残血版分布式锁

小白老师:“码哥,怎么解决释放别人的锁的情况呢?”

解决方法:客户端加锁时设置一个“唯一标识”,可以让 value 存储客户端的唯一标识,比如随机数、 UUID 等;释放锁时判断锁的唯一标识与客户端的标识是否匹配,匹配才能删除。

加锁

SET lockKey randomValue NX PX 3000

解锁

删除锁的时候判断唯一标识是否匹配伪代码如下。

if (jedis.get(lockKey).equals(randomValue)) {
    jedis.del(lockKey);
}

加锁、解锁的伪代码如下所示。

try (Jedis jedis = pool.getResource()) {
  //加锁成功
  if(jedis.set(lockKey, randomValue, "NX", "PX", 3000) == 1){
    do work //执行业务
  }
} finally {
  //判断是不是当前线程加的锁,是才释放
  if (randomValue.equals(jedis.get(keylockKey {
     jedis.del(lockKey); //释放锁
  }
}

到这里,很多公司可能都是使用这个方式来实现分布式锁。

小白:“码哥,还有问题。判断锁的唯一标识是否与当前客户端匹配和删除操作不是原子操作。”

聪明。这个方案还存在原子性问题,存在其他客户端把锁给释放的问题。

  1. 客户端 A 执行唯一标识匹配成功,还来不及执行DEL释放锁操作,锁过期被释放
  2. 客户端 B 获取锁成功,value 设置了自己的客户端唯一标识。
  3. 客户端 A 继续执行 DEL删除锁操作,相当于把客户端 B 的锁给删了。

青铜版分布式锁

虽然叫青铜版,这也是我们最常用的分布式锁方案之一了,这个版本没有太大的硬伤,并且比较简单。

小白老师:“码哥,这如何是好,如何解决解锁不是原子操作的问题?分布式锁这么多门道,是我肤浅了。”

解决方案很简单,解锁的逻辑我们可以通过 Lua 脚本来实现判断和删除的过程。

  • KEYS[1]是 lockKey。
  • ARGV[1] 表示客户端的唯一标识 requestId。

返回 nil 表示锁不存在,已经被删除了。只有返回值是 1 才表示加锁成功。

// key 不存在,返回 null
if (redis.call('exists', KEYS[1]) == 0) then
   return nil;
end;
// 获取 KEY[1] 中的 value 与 ARGV[1] 匹配,匹配则 del,返回 1。不匹配 return 0 解锁失败
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1]);
else
    return 0;
end;

使用上面的脚本,每个锁都用一个随机值作为唯一标识,当删除锁的客户端的“唯一标识”与锁的 value 匹配的时候,才能执行删除操作。这个方案已经相对完美,我们用的最多的可能就是这个方案了。

理论知识学完了,上实战。

Spring Boot 环境准备

接下来码哥,给你一个基于 Spring Boot 并且能用于生产实战的代码。在上实战代码之前,先把 Spring Boot 集成 Redis 的环境搞定。

添加依赖

代码基于 Spring Boot 2.7.18 ,使用 lettuce 客户端来操作 Redis。添加 spring-boot-starter-data-redis依赖。

<dependencyManagement>
    <dependencies>
        <dependency>
            <!-- Import dependency management from Spring Boot -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.7.18</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
            <optional>true</optional>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <!--redis依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!--Jackson依赖-->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

SpringBoot 配置

先配置 yaml。

server:
  servlet:
    context-path: /redis
  port: 9011
spring:
  application:
    name: redis
  redis:
    host: 127.0.0.1
    port: 6379
    password: magebyte
    timeout: 6000
    client-type: lettuce
    lettuce:
      pool:
        max-active: 300
        max-idle: 100
        max-wait: 1000ms
        min-idle: 5

RedisTemplate 默认序列化方式不具备可读性,我们改下配置,使用 JSON 序列化。注意了,这一步是附加操作,与分布式锁没有关系,是码哥顺带给你的彩蛋。

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(connectionFactory);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer); // key的序列化类型
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 方法过期,改为下面代码
//        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
                ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value的序列化类型
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

把分布式锁接口定义出来,所谓面向接口和对象编程,代码如有神。顺带用英文显摆下什么叫做专业。

/**
 * 分布式锁
 */
public interface Lock {
    /**
     * Tries to acquire the lock with defined <code>leaseTime</code>.
     * Waits up to defined <code>waitTime</code> if necessary until the lock became available.
     * <p>
     * Lock will be released automatically after defined <code>leaseTime</code> interval.
     *
     * @param waitTime  the maximum time to acquire the lock
     * @param leaseTime lease time
     * @param unit      time unit
     * @return <code>true</code> if lock is successfully acquired,
     * otherwise <code>false</code> if lock is already set.
     * @throws InterruptedException - if the thread is interrupted
     */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    /**
     * Acquires the lock with defined <code>leaseTime</code>.
     * Waits if necessary until lock became available.
     * <p>
     * Lock will be released automatically after defined <code>leaseTime</code> interval.
     *
     * @param leaseTime the maximum time to hold the lock after it's acquisition,
     *                  if it hasn't already been released by invoking <code>unlock</code>.
     *                  If leaseTime is -1, hold the lock until explicitly unlocked.
     * @param unit      the time unit
     */
    void lock(long leaseTime, TimeUnit unit);
    /**
     * Releases the lock.
     *
     * <p><b>Implementation Considerations</b>
     *
     * <p>A {@code Lock} implementation will usually impose
     * restrictions on which thread can release a lock (typically only the
     * holder of the lock can release it) and may throw
     * an (unchecked) exception if the restriction is violated.
     * Any restrictions and the exception
     * type must be documented by that {@code Lock} implementation.
     */
    void unlock();
}

青铜分布式锁实战

DistributedLock 实现 Lock 接口,构造方法实现 resourceNameStringRedisTemplate 的属性设置。客户端唯一标识使用uuid:threadId 组成。

DistributedLock

public class DistributedLock implements Lock {
    /**
     * 标识 id
     */
    private final String id = UUID.randomUUID().toString();
    /**
     * 资源名称
     */
    private final String resourceName;
    private final List<String> keys = new ArrayList<>(1);
    /**
     * redis 客户端
     */
    private final StringRedisTemplate redisTemplate;
    public DistributedLock(String resourceName, StringRedisTemplate redisTemplate) {
        this.resourceName = resourceName;
        this.redisTemplate = redisTemplate;
        keys.add(resourceName);
    }
    private String getRequestId(long threadId) {
        return id + ":" + threadId;
    }
}

加锁 tryLock、lock

tryLock 以阻塞等待 waitTime 时间的方式来尝试获取锁。获取成功则返回 true,反之 false。tryAcquire 方法相当于执行了 Redis 的SET resourceName uuid:threadID NX PX {leaseTime} 指令。

tryLock不同的是, lock 一直尝试自旋阻塞等待获取分布式锁,直到获取成功为止。而 tryLock 只会阻塞等待 waitTime 时间。

此外,为了让程序更加健壮,码哥实现了阻塞等待获取分布式锁,让你用的更加开心,面试不慌加薪不难。如果你不需要自旋阻塞等待获取锁,那把 while 代码块删除即可。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 获取锁
    Boolean isAcquire = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (Boolean.TRUE.equals(isAcquire)) {
        return true;
    }
    time -= System.currentTimeMillis() - current;
    // 等待时间用完,获取锁失败
    if (time <= 0) {
        return false;
    }
    // 自旋获取锁
    while (true) {
        long currentTime = System.currentTimeMillis();
        isAcquire = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (Boolean.TRUE.equals(isAcquire)) {
            return true;
        }
        time -= System.currentTimeMillis() - currentTime;
        if (time <= 0) {
            return false;
        }
    }
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
    long threadId = Thread.currentThread().getId();
    Boolean acquired;
    do {
        acquired = tryAcquire(leaseTime, unit, threadId);
    } while (Boolean.TRUE.equals(acquired));
}
private Boolean tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return redisTemplate.opsForValue().setIfAbsent(resourceName, getRequestId(threadId), leaseTime, unit);
}

解锁 unlock

解锁的逻辑是通过执行 lua 脚本实现。

@Override
public void unlock() {
    long threadId = Thread.currentThread().getId();
    // 执行 lua 脚本
    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LuaScript.unlockScript(), Long.class);
    Long opStatus = redisTemplate.execute(redisScript, keys, getRequestId(threadId));
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + threadId);
    }
}

LuaScript

其实这个脚本就是在讲解青铜板分布式锁原理的那段代码,具体逻辑已经解释过,这里就不再重复分析。

public class LuaScript {
    private LuaScript() {
    }
    /**
     * 分布式锁解锁脚本
     *
     * @return 当且仅当返回 `1`才表示加锁成功.
     */
    public static String unlockScript() {
        return "if (redis.call('exists', KEYS[1]) == 0) then " +
                "return nil;" +
                "end; "+
                "if redis.call('get',KEYS[1]) == ARGV[1] then" +
                "   return redis.call('del',KEYS[1]);" +
                "else" +
                "   return 0;" +
                "end;";
    }
}

RedisLockClient

最后,还需要提供一个客户端给方便使用。

@Component
public class RedisLockClient {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public Lock getLock(String name) {
        return new DistributedLock(name, redisTemplate);
    }
}

单元测试来一个。

@Slf4j
@SpringBootTest(classes = RedisApplication.class)
public class RedisLockTest {
    @Autowired
    private RedisLockClient redisLockClient;
    @Test
    public void testLockSuccess() throws InterruptedException {
        Lock lock = redisLockClient.getLock("order:pay");
        try {
            boolean isLock = lock.tryLock(10, 30, TimeUnit.SECONDS);
            if (!isLock) {
                log.warn("加锁失败");
                return;
            }
            TimeUnit.SECONDS.sleep(3);
            log.info("业务逻辑执行完成");
        } finally {
            lock.unlock();
        }
    }
}

有两个点需要注意。

  1. 释放锁的代码一定要放在 finally{} 块中。否则一旦执行业务逻辑过程中抛出异常,程序就无法执行释放锁的流程。只能干等着锁超时释放。
  2. 加锁的代码应该写在 try {} 代码中,放在 try 外面的话,如果执行加锁异常(客户端网络连接超时),但是实际指令已经发送到服务端并执行,就会导致没有机会执行解锁的代码。

小白:“码哥,这个方案你管它叫青铜级别而已,这么说还有王者、超神版?我们公司还用错误版分布式锁,难怪有时候出现重复订单,是我肤浅了。”

赶紧将这个方案替换原来的错误或者残血版的 Redis 分布式锁吧。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
20天前
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
461 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
23天前
|
NoSQL Java Redis
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
152 83
|
5天前
|
存储 Java 文件存储
🗄️Spring Boot 3 整合 MinIO 实现分布式文件存储
本文介绍了如何基于Spring Boot 3和MinIO实现分布式文件存储。随着应用规模扩大,传统的单机文件存储方案难以应对大规模数据和高并发访问,分布式文件存储系统成为更好的选择。文章详细讲解了MinIO的安装、配置及与Spring Boot的整合步骤,包括Docker部署、MinIO控制台操作、Spring Boot项目中的依赖引入、配置类编写及工具类封装等内容。最后通过一个上传头像的接口示例展示了具体的开发和测试过程,强调了将API操作封装成通用工具类以提高代码复用性和可维护性的重要性。
81 6
🗄️Spring Boot 3 整合 MinIO 实现分布式文件存储
|
19天前
|
缓存 NoSQL 搜索推荐
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
48 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
|
1月前
|
缓存 NoSQL 中间件
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
130 6
Redis,分布式缓存演化之路
|
2月前
基于springboot+thymeleaf+Redis仿知乎网站问答项目源码
基于springboot+thymeleaf+Redis仿知乎网站问答项目源码
165 36
|
5月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
3月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
254 5
|
4月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
127 8
|
4月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
99 16