【Redis核心知识 四】Redis分布式锁实战(上)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 【Redis核心知识 四】Redis分布式锁实战(上)

最近在公司做的一个功能是这样的,异步的方式进行导入任务处理也即:客户机生产消息,将消息通过RabbitMQ服务器分发到部署有消费代码的服务端机器进行并发处理,这样做保证了消息的异步处理,但是可能大消息量会对机器有较大的压力,所以所有通过RabbitMQ传递的消息在传递到消费端后首先要被放置到一个缓存队列里排队,然后每台机器再有条不紊的启用线程来消费队列里的消息。这样貌似很完美,但是在使用的过程中遇到了一个业务难题:

  1. 用户希望对这些到来的消息分组,并且在整体并发的前提下能控制相同分组的消息串行
  2. 用户并不希望会因为消息分组带来较大的性能损耗

基于以上两点业务问题,我使用Redis的分布式锁来解决。设定实体编码+租户id为分组编码,给每个导入消息生成一个分组编码,同一分组下的导入任务不能被同时执行,只能被串行化执行,不同分组的任务则不受此限制,通俗的说就是:总体任务仍然是并行执行,同一分组的任务则串行执行。这样势必会降低性能,所以为了保证代码隔离和性能稳定,需要开启一套新的处理消息分组的线程和一个专门用来填充分组导入消息的Redis消息队列,这样未开启导入串行化配置的应用和租户不会受到任何影响,依旧走之前的逻辑,而开启了串行化配置的应用和租户则有额外的线程去处理任务,这样性能即使会下降,影响也很有限。开销仅仅是开启两个线程和增加一个分组的Reids消息队列。

消息分发

当然第一步需要做的就是对消息进行分组,然后再开启一个存放分组消息的队列,读取到分组的消息就塞到分组的缓存队列里,这样消息分发的流程就变成了如下图所示:

当然为了兼容以前的逻辑,只有开启了配置(有分组需求的客户)才会使用分组消息队列,整体流程如下:

  1. 客户机进行操作,产生任务消息,发往RabbitMQ服务器
  2. RabbitMQ服务器将任务消息分发到各个消费端服务器
  3. 各个消费端服务器在接收到任务消息后区分是否需要对消息分组,如果需要读取其携带的附属信息,给每个任务消息生成一个分组编码,将该任务信息存放到分组消息缓存队列,如果不需要直接存放到原有的不需分组的消息缓存队列。

这样就完成了第一步,消息分组分发:分组消息通过Reids的Lpush操作依次进入分组消息队列中 ,当然入队的时候也是要上队列锁(下边会提到)的,系统线程尝试获取该队列锁,获取成功则执行入队操作,获取失败则休眠800毫秒后继续尝试获取,不管消息入队成功与否,都要释放队列锁,防止导入任务执行受影响。这一段代码的处理流程如下:

public MessageResult TaskRoute(ImportRequestDataModel data, MessageResult messageResult)
        {
            var tenantId = data.TenantId;
            var userId = data.UserId;
            //如果该任务属于开启了分组的任务则进入分组消息队列
            if (ImportParallelismHelper.IsEnableImportParallelism(data.AppName, data.TenantId))
            {
                while (true)
                {
                    if (ImportGroupQueue.GetQueueLock())
                    {
                        break;
                    }
                    System.Threading.Thread.Sleep(800); //先设置,每次处理完之后,休眠800毫秒,保证不过于频繁的请求redis造成redis压力
                }
                var enQueueResult = ImportGroupQueue.En_Queue(data, tenantId); //开启导入消息队列,将导入消息插入到缓存消息队列头部
                if (!enQueueResult)
                {
                    _loggging.Error("导入数据 In_GroupQueue失败:userId:" + userId + " tenantId:" + tenantId);
                    ImportGroupQueue.QueueUnLock(); //无论是否能执行入队,都要给缓存队列解锁
                    return messageResult;
                }
                _loggging.Debug("当前入队消息的model为" + Common.Serialize.SerializeHelper.Serialize(data));
                _loggging.Debug("入队消息总数为" + ImportGroupQueue.RedisIncr("count"));
                ImportGroupQueue.QueueUnLock(); //无论是否能执行入队,都要给缓存队列解锁
            }
            else //若未开启分组,则任务进入原有队列,走以前的逻辑
            {
                var enQueueResult = ImportQueue.En_Queue(data, tenantId);  //开启导入消息队列,将导入消息插入到缓存消息队列头部
                if (!enQueueResult)
                {
                    _loggging.Error("导入数据 In_Queue失败:userId:" + userId + " tenantId:" + tenantId);
                    return messageResult;
                }
            }
            return messageResult;
        }

当然其中将消息入队用到了Redis的Lpush操作:

/// <summary>
        /// 开启导入消息队列,将导入消息插入到缓存消息队列头部
        /// </summary>
        /// <param name="model">导入请求的model</param>
        /// <param name="tenantId">租户id</param>
        /// <returns></returns>
        public static bool En_Queue(ImportRequestDataModel model, int tenantId)
        {
            try
            {
                if (model != null)
                {
                    const string key = ImportQueueRedisKey;  //设置redis导入队列的key值
                    var jsonValue = Common.Serialize.SerializeHelper.Serialize(model);
                    RedisSet_LPush(key, jsonValue);
                    return true;
                }
                else
                {
                    return false;
                }
            }
            catch (Exception ex)
            {
                Loggging.Error("租户:" + tenantId + ", Enqueue失败,ImportRequestDataModel 数据:" + Common.Serialize.SerializeHelper.Serialize(model), ex);
                return false;
            }
        }

当然为了统计不同机器入队的消息总和,使用Redis的自增来进行监控:

/// <summary>
        /// Redis自增计数
        /// </summary>
        /// <param name="key"></param>
        public static long RedisIncr(string key)
        {
            //申请成功标志
            const int tenantId = TenantIdInRedis;
            try
            {
                using (var redis = new RedisNativeProviderV2(KeySpaceInRedis, tenantId))
                {
                    //redis中申请key-value
                    return redis.Incr(key);
                }
            }
            catch (Exception ex)
            {
                //进行创建异常的操作
                Loggging.Error($"在redis Incr[{key}]异常", ex);
            }
            finally
            {
                //进行创建成功或失败的操作
                Loggging.Info($"在redis LPush[{key}]");
            }
            return 0;
        }

消息处理

针对分组消息,我们在对分组消息做处理的时候的核心逻辑就是:每个分组作为一个唯一标识其实可以作为一个锁的编码,我把它称之为分组锁,只有持有分组锁的线程才能对分组消息做处理,这样就保证了同一分组的消息被串行的按照队列的先后顺序处理,而不同分组的消息还是被并行处理,对性能没有太大的影响。基于此,每个线程都需要遍历分组队列的数据,获取到后尝试获取每个任务的分组锁,如果能获取到则将任务出队,执行该任务并且给该分组上锁,当然在这个过程中,为了防止别的线程也同时放完队列,造成并发操作,我们还需要给队列上一个队列锁。具体的实现逻辑如下图所示:

整体的执行流程如下:

  1. 分组线程每隔2秒尝试获取分组消息队列锁(也就是控制权),获取之后遍历队列尝试获取分组锁(分组的控制权)
  2. 如果获取成功,则开始操作队列里的数据,从右向左遍历导入消息的model,每拿到一个model都获取其分组信息,如果该分组已被上锁,则说明相同分组的消息正在被其它线程处理,则去寻找下一个
  3. 如果该线程找到了未被上分组锁的消息,则将该导入消息序列化存储到以自己的线程编码为key的缓存里(如果线程意外关闭,下次重新启动的时候还能从缓存里重新获取到任务继续执行)并且给该分组上锁,防止同一分组的消息被其它线程处理,将该分组消息出队,然后释放队列锁,开始处理该导入消息,消息处理完成后,将自己的线程编码缓存设置为空,释放该分组锁,最后休眠2秒后继续尝试获取该队列的控制权。
  4. 如果该线程没有找到未被上分组锁的消息,释放队列锁并且在休眠2秒后继续尝试获取队列的控制权
  5. 如果线程在执行过程中关闭,再次重启的时候,会先从自己的线程编码缓存里尝试获取导入消息model,如果获取到则说明上次没有处理完,则需要继续处理,当然继续处理前需要确认当前线程是否持有该消息所属分组的分组锁,只有持有分组锁才能执行操作,否则只能等待并尝试获取分组锁,直到获取成功后执行。如果获取不到则证明重启前任务已执行完,则继续执行下边的逻辑从分组消息队列获取消息处理即可,走1-4部分的逻辑

这样保证了即使线程意外关闭不丢消息,线程任务同组串行执行,不同组并行执行的过程。整段执行流程的代码如下:

/// <summary>
        /// 单个处理导入数据的线程方法(分组的写法)
        /// </summary>
        /// <param name="queueIndexObj"></param>
        private void DealImportDataInGroupQueue(object queueIndexObj)
        {
            var queueIndex = Convert.ToInt32(queueIndexObj);
            ImportRequestDataModel data = null;
            try
            {
                data = ImportGroupQueue.GetLastTimeThreadUnDealData(queueIndex);//获取上次线程的导入数据
                if (data != null)  //如果data不为null
                {
                    var groupKey = ImportParallelismHelper.GetMessageGroupId(data.MetaObjName, data.TenantId);
                    while (true)
                    {
                        if (ImportGroupQueue.GetGroupLock(groupKey))  //如果可以获取到该分组的锁则处理残留数据
                        {
                            break;
                        }
                        System.Threading.Thread.Sleep(800); //先设置,每次处理完之后,休眠800毫秒,保证不过于频繁的请求redis造成redis压力
                    }
                    _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "持有分组锁:" + groupKey + "该分组锁的实体编码为:" + data.AppName + "租户id为:" + data.TenantId);
                    _loggging.Info("处理上次退出时留在redis中的数据。线程:" + queueIndex + " data:" + Common.Serialize.SerializeHelper.Serialize(data));
                    ImportData(data, queueIndex);
                    ImportGroupQueue.ClearCurrentDataModelInRedis(queueIndex);
                    ImportGroupQueue.GroupUnLock(data);  //参数数据处理完毕后,释放该锁
                    _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "释放了分组锁:" + groupKey + "该分组锁的实体编码为:" + data.AppName + "租户id为:" + data.TenantId);
                }
            }
            catch (Exception ex)
            {
                _loggging.Error("处理上次退出时留在redis中的数据发生异常:线程:" + queueIndex + " data:" + Common.Serialize.SerializeHelper.Serialize(data), ex);
            }
            _loggging.Debug("队列长度为" + ImportGroupQueue.GetRedisQueueLength());
            while (true)
            {
                if (ImportGroupQueue.GetRedisQueueLength() > 0)
                {
                    _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "试图访问队列");
                    if (ImportGroupQueue.GetQueueLock())
                    {
                        _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "访问队列成功");
                        _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "持有队列锁:QueueLock");
                        data = ImportGroupQueue.De_Queue(queueIndex);
                        ImportGroupQueue.QueueUnLock(); //无论取到的data是否为空,都要给缓存队列解锁
                        _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "释放了队列锁:QueueLock");
                        if (data != null)
                        {
                            ImportData(data, queueIndex);
                            _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "对分组" + ImportParallelismHelper.GetMessageGroupId(data.MetaObjName, data.TenantId) + "的导入任务执行完成");
                            ImportGroupQueue.ClearCurrentDataModelInRedis(queueIndex); //清空当前缓存
                            ImportGroupQueue.GroupUnLock(data); //给该分组解锁
                            _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "释放了分组锁:" + ImportParallelismHelper.GetMessageGroupId(data.MetaObjName, data.TenantId) + "该分组锁的实体编码为:" + data.AppName + "租户id为:" + data.TenantId);
                        }
                    }
                }
                System.Threading.Thread.Sleep(2000);//先设置,每次处理完之后,休眠2秒钟在,保证不过于频繁的请求redis造成redis压力
            }
        }

其中较为核心的遍历队列获取分组消息的内容如下:

/// <summary>
        ///将导入消息从总的缓存队列尾部取出并存储到单个机器上的缓存队列中
        /// </summary>
        /// <param name="threadIndex"></param>
        /// <returns></returns>
        public static ImportRequestDataModel De_Queue(int threadIndex)
        {
            //先从redis中寻找自己线程对应的数据,如果有,这说明还存在上次服务停止前没有处理完的数据,先处理这些数据。如果没有,则去获取redis 队列中的数据处理
            ImportRequestDataModel model = null;
            const string rkey = ImportQueueRedisKey;       
            Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "开启对队列内容的检索,寻找未添加分组锁的导入消息");
            var queueLen = RedisGetLength(rkey); //获取redis队列长度
            for (var i = queueLen - 1; i >= 0; i--)  //从队列右边开始倒序获取model
            {
                model = RedisLIndex(ImportQueueRedisKey, i);  //获取该model
                var key = ThreadRedisKeyPrefix + threadIndex;  //当前机器当前线程的缓存
                if (model != null)
                {
                    var jsonValue = Common.Serialize.SerializeHelper.Serialize(model);
                    Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "当前判定的model为:" + jsonValue + "其队列顺序为:" + i);
                    //获取分组名并先从锁缓存里判断是否存在该分组的导入消息
                    var groupKey = ImportParallelismHelper.GetMessageGroupId(model.MetaObjName, model.TenantId);
                    Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "尝试获取分组锁:" + groupKey + "该分组锁的实体编码为:" + model.AppName + "租户id为:" + model.TenantId);
                    //如果为true表明分组表里不存在该分组,设置锁并执行该分组的任务
                    if (GetGroupLock(groupKey))
                    {
                        Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "持有分组锁:" + groupKey + "该分组锁的实体编码为:" + model.AppName + "租户id为:" + model.TenantId);
                        //设置该导入任务到单台机器线程的redis里
                        RedisSetOne(key, jsonValue);
                        Loggging.Debug("当前出队model序列化为" + jsonValue + "其队列顺序为:" + i);
                        RedisLRem(jsonValue); //从消息队列里移除该元素
                        Loggging.Debug("队列长度为" + RedisGetLength(rkey));
                        break; //找到可以处理的元素后就跳出循环
                    }
                    else //如果没有获取到该分组的锁,则该model不需要出队,设置返回的model为null并且设置当前线程空置
                    {
                        model = null;
                        RedisSetOne(key, "");
                        Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "获取分组锁:" + groupKey + "失败");
                    }
                }
                else
                {
                    RedisSetOne(key, "");
                }
            }
            return model;
        }

我们都知道Redis是单线程的,所以天然不存在并发问题,name我们来看看如何通过Redis来实现队列锁和分组锁。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
24天前
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
471 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
27天前
|
NoSQL Java Redis
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
160 83
|
23天前
|
缓存 NoSQL 搜索推荐
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
52 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
|
11天前
|
机器学习/深度学习 分布式计算 API
Python 高级编程与实战:深入理解并发编程与分布式系统
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发、API 设计、网络编程和异步IO。本文将深入探讨 Python 在并发编程和分布式系统中的应用,并通过实战项目帮助你掌握这些技术。
|
9天前
|
消息中间件 分布式计算 并行计算
Python 高级编程与实战:构建分布式系统
本文深入探讨了 Python 中的分布式系统,介绍了 ZeroMQ、Celery 和 Dask 等工具的使用方法,并通过实战项目帮助读者掌握这些技术。ZeroMQ 是高性能异步消息库,支持多种通信模式;Celery 是分布式任务队列,支持异步任务执行;Dask 是并行计算库,适用于大规模数据处理。文章结合具体代码示例,帮助读者理解如何使用这些工具构建分布式系统。
|
1月前
|
缓存 NoSQL 中间件
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
130 6
Redis,分布式缓存演化之路
|
14天前
|
人工智能 Kubernetes 异构计算
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
|
1月前
|
人工智能 Kubernetes 异构计算
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
本教程演示如何在ACK中多机分布式部署DeepSeek R1满血版。
|
2月前
|
存储 缓存 Java
Java中的分布式缓存与Memcached集成实战
通过在Java项目中集成Memcached,可以显著提升系统的性能和响应速度。合理的缓存策略、分布式架构设计和异常处理机制是实现高效缓存的关键。希望本文提供的实战示例和优化建议能够帮助开发者更好地应用Memcached,实现高性能的分布式缓存解决方案。
50 9
|
存储 缓存 NoSQL
Redis实战之入门进阶到精通
Redis 是一个远程内存数据库,它不仅性能强劲,而且还具有复制特性以及为解决问题而生的独一无二的数据模型。Redis 提供了 5 种不同类型的数据结构,各式各样的问题都可以很自然地映射到这些数据结构上:Redis 的数据结构致力于帮助用户解决问题,而不会像其他数据库那样,要求用户扭曲问题来适应数据库。除此之外,通过复制、持久化(persistence)和客户端分片(client-side sharding)等特性,用户可以很方便地将 Redis 扩展成一个能够包含数百 GB 数据、每秒处理上百万次请求的系统。
Redis实战之入门进阶到精通