一文读懂分布式锁——使用SpringBoot+Redis实现分布式锁解决方案

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 随着现在分布式架构越来越盛行,在很多场景下需要使用到分布式锁。很多小伙伴对于分布式锁还不是特别了解,所以特地总结了一篇文章,让大家一文读懂分布式锁的前世今生。分布式锁的实现有很多种,比如基于数据库、Redis 、 zookeeper 等实现,本文的示例主要介绍使用Redis实现分布式锁。

随着现在分布式架构越来越盛行,在很多场景下需要使用到分布式锁。很多小伙伴对于分布式锁还不是特别了解,所以特地总结了一篇文章,让大家一文读懂分布式锁的前世今生。

分布式锁的实现有很多种,比如基于数据库、Redis 、 zookeeper 等实现,本文的示例主要介绍使用Redis实现分布式锁。


一、什么是分布式锁

分布式锁,即分布式系统中的锁,分布式锁是控制分布式系统有序的对共享资源进行操作,在单体应用中我们通过锁实现共享资源访问,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。


可能初学的小伙伴就会有疑问,Java多线程中的公平锁、非公平锁、自旋锁、可重入锁、读写锁、互斥锁这些都还没闹明白呢?怎么有出来一个分布式锁?


其实,可以这么理解:Java的原生锁是解决多线程下对于共享资源的操作,而分布式锁则是多进程下对于共享资源的操作。分布式系统中竞争共享资源的最小粒度从线程升级成了进程。


分布式锁已经被应用到各种高并发的场景下,典型场景案例包括:秒杀、车票、订单、退款、库存等场景。


二、为什么要使用分布式锁

在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,这个时候,便需要使用到分布式锁。


目前几乎很多大型网站及应用都是分布式部署的,如何保证分布式场景中的数据一致性问题一直是一个比较重要的话题。在某些场景下,为了保证数据的完整性和一致性,我们需要保证一个方法在同一时间内只能被同一个线程执行,这就需要使用分布式锁。

image.png

如上图所示,假设用户A和用户B同时购买了某款商品,订单创建成功后,下单系统A和下单系统B就会同时对数据库中的该款商品的库存进行扣减。如果此时不加任何控制,系统B提交的数据更新就会覆盖系统A的数据,导致库存错误,超卖等问题。


三、分布式锁应该具备哪些条件

在介绍分布式锁的实现方式之前,先了解一下分布式锁应该具备哪些条件:

1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;

2、高可用的获取锁与释放锁;

3、高性能的获取锁与释放锁;

4、具备可重入特性;

5、具备锁失效机制,防止死锁;

6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。


四、分布式锁的实现方式

随着业务发展的需要,原来的单体应用被演化成分布式集群系统后,由于系统分布在不同机器上,这就使得原有的并发控制锁策略失效,为了解决这个问题就需要一种跨进程的互斥机制来控制共享资源的访问,这就需要用到分布式锁!

分布式锁的实现有多种方式,下面介绍下这几种分布式锁的实现:

  • 基于数据库实现分布式锁,(适用于并发小的系统);
  • 基于缓存(Redis等)实现分布式锁,(效率高,最流行,存在锁超时的问题);
  • 基于Zookeeper实现分布式锁,(可靠,但是效率不高);

尽管有这三种方案,但是不同的业务也要根据自己的情况进行选型,他们之间没有最好只有更适合!


五、基于Redis实现分布式锁

使用Redis实现分布式锁是目前比较流行的解决方案,主要是使用Redis 获取锁与释放锁效率都很高,实现方式也特别简单。

实现原理:

(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为线程ID,通过此在释放锁的时候进行判断。

(2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。

(3)释放锁的时候,通过线程ID判断是不是该锁,若是该锁,则执行delete进行锁释放。

说完了Redis分布式锁的实现原理,接下来就带大家一步一步在SpringBoot项目中使用Redis 实现分布式锁。


第一步,创建Spring Boot项目,并引入相关依赖。

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

第二步,创建Redis分布式锁通用操作类,示例代码如下:

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * Lua脚本
 * // 加锁
 * if
 *     redis.call('setNx',KEYS[1],ARGV[1])
 *   then
 *     if redis.call('get',KEYS[1])==ARGV[1]
 *     return redis.call('expire',KEYS[1],ARGV[2])
 *   else
 *     return 0
 *   end
 * end
 *
 * // 解锁
 *   redis.call('get', KEYS[1]) == ARGV[1]
 * then
 *   return redis.call('del', KEYS[1])
 * else
 *   return 0
 *
 *
 *   //更新时间
 *   if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end
 *
 *
 *
 *
 */
@Slf4j
@Component
public class RedisLockUtils {
    @Resource
    private RedisTemplate redisTemplate;
    private static Map<String, LockInfo> lockInfoMap = new ConcurrentHashMap<>();
    private static final Long SUCCESS = 1L;
    public static class LockInfo {
        private String key;
        private String value;
        private int expireTime;
        //更新时间
        private long renewalTime;
        //更新间隔
        private long renewalInterval;
        public static LockInfo getLockInfo(String key, String value, int expireTime) {
            LockInfo lockInfo = new LockInfo();
            lockInfo.setKey(key);
            lockInfo.setValue(value);
            lockInfo.setExpireTime(expireTime);
            lockInfo.setRenewalTime(System.currentTimeMillis());
            lockInfo.setRenewalInterval(expireTime * 2000 / 3);
            return lockInfo;
        }
        public String getKey() {
            return key;
        }
        public void setKey(String key) {
            this.key = key;
        }
        public String getValue() {
            return value;
        }
        public void setValue(String value) {
            this.value = value;
        }
        public int getExpireTime() {
            return expireTime;
        }
        public void setExpireTime(int expireTime) {
            this.expireTime = expireTime;
        }
        public long getRenewalTime() {
            return renewalTime;
        }
        public void setRenewalTime(long renewalTime) {
            this.renewalTime = renewalTime;
        }
        public long getRenewalInterval() {
            return renewalInterval;
        }
        public void setRenewalInterval(long renewalInterval) {
            this.renewalInterval = renewalInterval;
        }
    }
    /**
     * 使用lua脚本加锁
     * @param lockKey    锁
     * @param value      身份标识(保证锁不会被其他人释放)
     * @param expireTime 锁的过期时间(单位:秒)
     * @Desc 注意事项,redisConfig配置里面必须使用 genericToStringSerializer序列化,否则获取不了返回值
     */
    public boolean tryLock(String lockKey, String value, int expireTime) {
        String luaScript = "if redis.call('setNx',KEYS[1],ARGV[1]) then if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        //Object result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey),value,expireTime + "");
        // Object result = redisTemplate.execute(redisScript, new StringRedisSerializer(), new StringRedisSerializer(), Collections.singletonList(lockKey), identity, expireTime);
        Object result = redisTemplate.execute(redisScript, keys, value, expireTime);
        log.info("已获取到{}对应的锁!", lockKey);
        if (expireTime >= 10) {
            lockInfoMap.put(lockKey + value, LockInfo.getLockInfo(lockKey, value, expireTime));
        }
        return (boolean) result;
    }
    /**
     * 使用lua脚本释放锁
     * @param lockKey
     * @param value
     * @return 成功返回true, 失败返回false
     */
    public boolean unlock(String lockKey, String value) {
        lockInfoMap.remove(lockKey + value);
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        Object result = redisTemplate.execute(redisScript, keys, value);
        log.info("解锁成功:{}", result);
        return (boolean) result;
    }
    /**
     * 使用lua脚本更新redis锁的过期时间
     * @param lockKey
     * @param value
     * @return 成功返回true, 失败返回false
     */
    public boolean renewal(String lockKey, String value, int expireTime) {
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        Object result = redisTemplate.execute(redisScript, keys, value, expireTime);
        log.info("更新redis锁的过期时间:{}", result);
        return (boolean) result;
    }
    /**
     *
     * @param lockKey    锁
     * @param value      身份标识(保证锁不会被其他人释放)
     * @param expireTime 锁的过期时间(单位:秒)
     * @return 成功返回true, 失败返回false
     */
    public boolean lock(String lockKey, String value, long expireTime) {
        return redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);
    }
    /**
     * redisTemplate解锁
     * @param key
     * @param value
     * @return 成功返回true, 失败返回false
     */
    public boolean unlock2(String key, String value) {
        Object currentValue = redisTemplate.opsForValue().get(key);
        boolean result = false;
        if (StringUtils.isNotEmpty(String.valueOf(currentValue)) && currentValue.equals(value)) {
            result = redisTemplate.opsForValue().getOperations().delete(key);
        }
        return result;
    }
    /**
     * 定时去检查redis锁的过期时间
     */
    @Scheduled(fixedRate = 5000L)
    @Async("redisExecutor")
    public void renewal() {
        long now = System.currentTimeMillis();
        for (Map.Entry<String, LockInfo> lockInfoEntry : lockInfoMap.entrySet()) {
            LockInfo lockInfo = lockInfoEntry.getValue();
            if (lockInfo.getRenewalTime() + lockInfo.getRenewalInterval() < now) {
                renewal(lockInfo.getKey(), lockInfo.getValue(), lockInfo.getExpireTime());
                lockInfo.setRenewalTime(now);
                log.info("lockInfo {}", JSON.toJSONString(lockInfo));
            }
        }
    }
    /**
     * 分布式锁设置单独线程池
     * @return
     */
    @Bean("redisExecutor")
    public Executor redisExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("redis-renewal-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        return executor;
    }
}

第三步,创建RedisTemplate 配置类,配置Redistemplate,示例代码如下:

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) throws Exception {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 创建 序列化类
        GenericToStringSerializer genericToStringSerializer = new GenericToStringSerializer(Object.class);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(genericToStringSerializer);
        return redisTemplate;
    }
}

第四步,实现业务调用,这里以扣减库存为例,示例代码如下:

@RestController
public class IndexController {
    @Resource
    private RedisTemplate redisTemplate;
    @Autowired
    private RedisLockUtils redisLock;
    @RequestMapping("/deduct-stock")
    public String deductStock() {
        String productId = "product001";
        System.out.println("---------------->>>开始扣减库存");
        String key = productId;
        String requestId = productId + Thread.currentThread().getId();
        try {
            boolean locked = redisLock.lock(key, requestId, 10);
            if (!locked) {
                return "error";
            }
            //执行业务逻辑
            //System.out.println("---------------->>>执行业务逻辑:"+appTitle);
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("product001-stock").toString());
            int currentStock = stock-1;
            redisTemplate.opsForValue().set("product001-stock",currentStock);
            try {
                Random random = new Random();
                Thread.sleep(random.nextInt(3) *1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("---------------->>>扣减库存结束:current stock:" + currentStock);
            return "success,current stock:" + currentStock;
        } finally {
            redisLock.unlock2(key, requestId);
        }
    }
}


六、验证测试

代码完成之后,开始测试。我们同时启动两个实例,端口号为:8888和8889模拟分布式系统。

接下来,我们分别请求:http://localhost:8888/deduct-stock和http://localhost:8889/deduct-stock,或者使用JMater分别请求这两个地址,模拟高并发的情况。

image.png

通过上图我们可以看到,在批量请求的情况下,库存扣减也没有出现问题。说明分布式锁生效了。


最后

以上,我们就把什么是分布式锁,如何基于Redis 实现分布式锁的解决方案介绍完了。分布式锁是分布式系统中的重要功能组件,希望大家能够熟练掌握。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
19天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
51 5
|
22天前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
39 8
|
25天前
|
消息中间件 监控 NoSQL
Redis脑裂问题详解及解决方案
Redis脑裂问题是分布式系统中常见的复杂问题,合理配置Redis Sentinel、使用保护模式、采用分布式锁机制以及优化网络和客户端连接策略等措施,可以有效预防和解决脑裂问题。通过深入理解Redis脑裂问题的成因和影响,采取相应的解决方案,能够提高系统的可用性和数据一致性,保障Redis集群的稳定运行。希望本文能帮助你更好地理解和应对Redis脑裂问题。
31 2
|
1月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
57 16
|
1月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
40 5
|
缓存 JSON NoSQL
redis在springboot中的使用
redis在springboot中的使用
183 0
|
Java 应用服务中间件 Maven
传统maven项目和现在spring boot项目的区别
Spring Boot:传统 Web 项目与采用 Spring Boot 项目区别
507 0
传统maven项目和现在spring boot项目的区别
|
XML Java 数据库连接
创建springboot项目的基本流程——以宠物类别为例
创建springboot项目的基本流程——以宠物类别为例
156 0
创建springboot项目的基本流程——以宠物类别为例
|
存储 机器学习/深度学习 IDE
SpringBoot 项目与被开发快速迁移|学习笔记
快速学习 SpringBoot 项目与被开发快速迁移
SpringBoot 项目与被开发快速迁移|学习笔记