Redis实现并发阻塞锁方案

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云原生内存数据库 Tair,内存型 2GB
简介: 由于用户同时访问线上的下订单接口,导致在扣减库存时出现了异常,这是一个很典型的并发问题,本篇文章为解决并发问题而生,采用的技术为Redis锁机制+多线程的阻塞唤醒方法。

Redis实现并发阻塞锁方案


由于用户同时访问线上的下订单接口,导致在扣减库存时出现了异常,这是一个很典型的并发问题,本篇文章为解决并发问题而生,采用的技术为Redis锁机制+多线程的阻塞唤醒方法。


在实现Redis锁机制之前,我们需要了解一下前置知识。


一、前置知识

1、多线程

将wait()、notifyAll()归为到多线程的方法中略有一些不恰当,这两个方法是Object中的方法。

① 当调用了wait()方法后,让当前线程进入等待状态,并且让当前线程释放对象锁,等待既为阻塞状态,等待notifyAll()方法的唤醒。

wait()方法和sleep()方法有一些相似之处,都是使当前线程阻塞,但他们实际是有一些区别的。


1. 执行wait() 方法之前需要请求锁,wait()方法执行的时候会释放锁,等待被唤醒的时候竞争锁。

1. sleep()只是让当前线程休眠一段时间,无视锁的存在。

1. wait() 是Object类的方法 sleep()是Thread的静态方法

② notifyAll()方法为唤醒wait()中的线程。

notifyAll() 和 notify() 方法都是可以唤醒调用了wait()方法,而陷入阻塞的线程。

但是notify()是随机唤醒这个阻塞队列中随机的一个线程,而notifyAll()是唤醒所用的调用了wait()方法而陷入阻塞的线程,让他们自己去抢占对象锁。

notifyAll() 和 notify() 也都是必须在加锁的同步代码块中被调用,它们起的是唤醒的作用,不是释放锁的作用,只用在当前同步代码块中的程序执行完,也就是对象锁自然释放了,notifyAll() 和 notify()方法才会起作用,去唤醒线程。

wait()方法一般是和notify() 或者 notifyAll() 方法一起连用的。


2、Redis

加锁的过程本质上就是往Redis中set值,当别的进程也来set值时候,发现里面已经有值了,就只能放弃获取稍后再试。

Redis提供了一个天然实现锁机制的方法。

在Redis客户端的命令为 setnx(set if not exists)

在集成Springboot中采用的方法为:

redisTemplate.opsForValue().setIfAbsent(key, value);

如果里面set值成功会返回True,如果里面已经存在值就会返回False。

在我们实际使用的时候,setIfAbsent()方法并不是总是返回True和False。

如果我们的业务中加了事务,该方法会返回null,不知道这是一个bug还是什么,这是Redis的一个巨坑,浪费了很长时间才发现了这个问题,如果解决此问题可以跳转到第四章。


二、实现原理


分布式锁本质上要实现的目标就是在 Redis 里面占一个位置,当别的进程也要来占时,发现已经有人占在那里了,就只好放弃或者稍后再试。占位一般是使用 setnx(set if not exists) 指令,只允许被一个客户端占位。先来先占, 事办完了,再调用 del 指令释放茅坑。


其中,发现Redis中已经有值了,当前线程是直接放弃还是稍后再试分别就代表着,非阻塞锁和阻塞锁。


在我们的业务场景中肯定是要稍后再试(阻塞锁),如果是直接放弃(非阻塞锁)在数据库层面就可以直接做,就不需要我们在代码大费周章了。


非阻塞锁只能保存数据的正确性,在高并发的情况下会抛出大量的异常,当一百个并发请求到来时,只有一个请求成功,其他均会抛出异常。


Redis非阻塞锁和 MySQL的乐观锁,最终达到的效果是一样的,乐观锁是采用CAS的思想。


乐观锁方法:表字段 加一个版本号,或者别的字段也可以!加版本号,可以知道控制顺序而已!在update 的时候可以where后面加上version= oldVersion。数据库,在任何并发的情况下,update 成功就是 1 失败就是 0 .可以根据返回的 1 ,0 做相应的处理!

我们更推荐大家使用阻塞锁的方式。


当获取不到锁时候,我们让当前线程使用wait()方法唤醒,当持有锁的线程使用完成后,调用notifyAll()唤醒所有等待的方法。


三、具体实现


以下代码为阻塞锁的实现方式。

业务层:

public String test() throws InterruptedException {
        lock("lockKey");
        System.out.println("11");
        System.out.println("22");
        System.out.println(Thread.currentThread().getName()+"***********");
        Thread.sleep(2000);
        System.out.println("33");
        System.out.println("44");
        System.out.println("55");
        unlock("lockKey");
        return "String";
    }


锁的工具类:

主要是加锁和解锁的两个方法。

//每一个redis的key对应一个阻塞对象
    private static HashMap<String, Object> blockers = new HashMap<>();
    //当前获得锁的线程
    private static Thread curThread;
    public static RedisTemplate redisTemplate = (RedisTemplate) SpringUtils.getBean("redisTemplate") ;
    /**
     * 加锁
     * @param key
     * @throws InterruptedException
     */
    public static void lock(String key) {
        //循环判断是否能够创建key, 不能则直接wait释放CPU执行权
        //放不进指说明锁正在被占用
        System.out.println(key+"**");
        while (!RedisUtil.setLock(key,"1",3)){
            synchronized (key) {
                blockers.put(key, key);
                //wait释放CPU执行权
                try {
                    key.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        blockers.put(key, key);
        //能够成功创建,获取锁成功记录当前获取锁线程
        curThread = Thread.currentThread();
    }
    /**
     * 解锁
     * @param key
     */
    public static void unlock(String key) {
        //判断是否为加锁的线程执行解锁, 不是则直接忽略
        if( curThread == Thread.currentThread()) {
            RedisUtil.delete(key);
            //删除key之后需要notifyAll所有的应用, 所以这里采用发订阅消息给所有的应用
          //  RedisUtil.publish("lock", key);
            //notifllall其他线程
            Object lock = blockers.get(key);
            if(lock != null) {
                synchronized (lock) {
                    lock.notifyAll();
                }
            }
        }
    }


当我们在不加锁时候,使用接口测试工具测试时,12345并不能都是顺序执行的,会造成输出顺序不一致,如果是在我们的实际场景中,这是输入换成了数据库的select和update,数据出现错乱也是很正常的情况了。

当我们加上锁以后,12345都是顺序输出,并发问题顺利解决了。


四、附录

1、Redis存在的bug

本来lock()方法是直接调用 "Redis.setIfAbsent()" 方法,但是在使用时候一直报空指针异常,最终定位问题为Redis.setIfAbsent()方法存在问题。

在我的实际业务中,下订单的方法使用了@Transflastion增加了事务,导致该方法返回null,我们手写一个实现setIfAbsent()的作用。

/**
     * 只有key不存在时,才设置值, 返回true, 否则返回false
     *
     * @param key     key 不能为null
     * @param value   value 不能为null
     * @param timeout 过期时长, 单位为妙
     * @return
     */
    public static Boolean setLock(String key,String value, long timeout) {
        SessionCallback<Boolean> sessionCallback = new SessionCallback<Boolean>() {
            List<Object> exec = null;
            @Override
            @SuppressWarnings("unchecked")
            public Boolean execute(RedisOperations operations) throws DataAccessException {
                operations.multi();
                redisTemplate.opsForValue().setIfAbsent(key, value);
                redisTemplate.expire(key,timeout, TimeUnit.SECONDS);
                exec = operations.exec();
                if(exec.size() > 0) {
                    return (Boolean) exec.get(0);
                }
                return false;
            }
        };
        return (Boolean) redisTemplate.execute(sessionCallback);
    }

方便对比,以下贴上原本的setIfAbsent()方法。

/**
   * 只有key不存在时,才设置值, 返回true, 否则返回false [警告:事务或者管道情况下会报错-可使用 setLock方法]
   *
   * @param key     key 不能为null
   * @param value   value 不能为null
   * @param timeout 过期时长, 单位为妙
   * @return
   */
  @Deprecated
  public static <T> Boolean setIfAbsent(String key, T value, long timeout) {
     // redisTemplate.multi();
      ValueOperations<String, T> valueOperations = redisTemplate.opsForValue();
      Boolean aBoolean = valueOperations.setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
     // redisTemplate.exec();
    return aBoolean;
  }


2、MySQL的锁机制

在并发场景下MySQL会报错,报错信息如下:

### Cause: com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction
; SQL []; Lock wait timeout exceeded; try restarting transaction; nested exception is com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction

问题出现的原因是,某一种表频繁被锁表,导致另外一个事务超时,出现问题的原因是MySQL的机制。


MySQL更新时如果where字段存在索引会使用行锁,否则会使用表锁。

我们使用navichat在where字段上加上索引,问题顺利的迎刃而解。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
10天前
|
存储 NoSQL Redis
Redis系列学习文章分享---第九篇(Redis快速入门之好友关注--关注和取关 -共同关注 -Feed流实现方案分析 -推送到粉丝收件箱 -滚动分页查询)
Redis系列学习文章分享---第九篇(Redis快速入门之好友关注--关注和取关 -共同关注 -Feed流实现方案分析 -推送到粉丝收件箱 -滚动分页查询)
9 0
|
10天前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
33 0
|
2月前
|
负载均衡 监控 NoSQL
Redis的几种主要集群方案
【5月更文挑战第15天】Redis集群方案包括主从复制(基础,读写分离,手动故障恢复)、哨兵模式(自动高可用,自动故障转移)和Redis Cluster(官方分布式解决方案,自动分片、容错和扩展)。此外,还有Codis、Redisson和Twemproxy等工具用于代理分片和负载均衡。选择方案需考虑应用场景、数据量和并发需求,权衡可用性、性能和扩展性。
227 2
|
10天前
|
NoSQL 算法 Java
技术好文:Redis实现分布式锁的7种方案
技术好文:Redis实现分布式锁的7种方案
|
21天前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
NoSQL Redis
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
11 0
|
1月前
|
存储 NoSQL 算法
Redis集群,集群的概念 三种主流分片方式1.哈希求余 一致性哈希算法:方案三:哈希槽分区算法问题一Redis集群是最多有16384个分片吗问题二:为什么是16384个,集群扩容:1.新的主节点
Redis集群,集群的概念 三种主流分片方式1.哈希求余 一致性哈希算法:方案三:哈希槽分区算法问题一Redis集群是最多有16384个分片吗问题二:为什么是16384个,集群扩容:1.新的主节点
|
2月前
|
缓存 NoSQL Redis
Redis经典问题:数据并发竞争
在大流量系统中,数据并发竞争可能导致系统性能下降和崩溃。为解决此问题,可以采取加写回操作和互斥锁,确保数据一致性并减少写操作对缓存的影响。另外,保持缓存数据多个备份能降低并发竞争概率。通过实例展示了如何在电商网站中应用这些策略,从而提高系统稳定性和性能。关注微信公众号“软件求生”获取更多技术分享。
426 1
|
2月前
|
存储 缓存 NoSQL
【技术分享】求取列表需求的redis缓存方案
【技术分享】求取列表需求的redis缓存方案
53 0
|
2月前
|
NoSQL Java Redis
lua脚本做redis的锁
这段内容是关于使用Redis实现分布式锁的Java代码示例。`RedisLock`类包含`lock`和`unlock`方法,使用`StringRedisTemplate`和Lua脚本进行操作。代码展示了两种加锁方式:一种带有过期时间,另一种不带。还提到了在加锁和解锁过程中的异常处理,并提供了相关参考资料链接。
35 3