【Redis核心知识 四】Redis分布式锁实战(上)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 【Redis核心知识 四】Redis分布式锁实战(上)

最近在公司做的一个功能是这样的,异步的方式进行导入任务处理也即:客户机生产消息,将消息通过RabbitMQ服务器分发到部署有消费代码的服务端机器进行并发处理,这样做保证了消息的异步处理,但是可能大消息量会对机器有较大的压力,所以所有通过RabbitMQ传递的消息在传递到消费端后首先要被放置到一个缓存队列里排队,然后每台机器再有条不紊的启用线程来消费队列里的消息。这样貌似很完美,但是在使用的过程中遇到了一个业务难题:

  1. 用户希望对这些到来的消息分组,并且在整体并发的前提下能控制相同分组的消息串行
  2. 用户并不希望会因为消息分组带来较大的性能损耗

基于以上两点业务问题,我使用Redis的分布式锁来解决。设定实体编码+租户id为分组编码,给每个导入消息生成一个分组编码,同一分组下的导入任务不能被同时执行,只能被串行化执行,不同分组的任务则不受此限制,通俗的说就是:总体任务仍然是并行执行,同一分组的任务则串行执行。这样势必会降低性能,所以为了保证代码隔离和性能稳定,需要开启一套新的处理消息分组的线程和一个专门用来填充分组导入消息的Redis消息队列,这样未开启导入串行化配置的应用和租户不会受到任何影响,依旧走之前的逻辑,而开启了串行化配置的应用和租户则有额外的线程去处理任务,这样性能即使会下降,影响也很有限。开销仅仅是开启两个线程和增加一个分组的Reids消息队列。

消息分发

当然第一步需要做的就是对消息进行分组,然后再开启一个存放分组消息的队列,读取到分组的消息就塞到分组的缓存队列里,这样消息分发的流程就变成了如下图所示:

当然为了兼容以前的逻辑,只有开启了配置(有分组需求的客户)才会使用分组消息队列,整体流程如下:

  1. 客户机进行操作,产生任务消息,发往RabbitMQ服务器
  2. RabbitMQ服务器将任务消息分发到各个消费端服务器
  3. 各个消费端服务器在接收到任务消息后区分是否需要对消息分组,如果需要读取其携带的附属信息,给每个任务消息生成一个分组编码,将该任务信息存放到分组消息缓存队列,如果不需要直接存放到原有的不需分组的消息缓存队列。

这样就完成了第一步,消息分组分发:分组消息通过Reids的Lpush操作依次进入分组消息队列中 ,当然入队的时候也是要上队列锁(下边会提到)的,系统线程尝试获取该队列锁,获取成功则执行入队操作,获取失败则休眠800毫秒后继续尝试获取,不管消息入队成功与否,都要释放队列锁,防止导入任务执行受影响。这一段代码的处理流程如下:

public MessageResult TaskRoute(ImportRequestDataModel data, MessageResult messageResult)
        {
            var tenantId = data.TenantId;
            var userId = data.UserId;
            //如果该任务属于开启了分组的任务则进入分组消息队列
            if (ImportParallelismHelper.IsEnableImportParallelism(data.AppName, data.TenantId))
            {
                while (true)
                {
                    if (ImportGroupQueue.GetQueueLock())
                    {
                        break;
                    }
                    System.Threading.Thread.Sleep(800); //先设置,每次处理完之后,休眠800毫秒,保证不过于频繁的请求redis造成redis压力
                }
                var enQueueResult = ImportGroupQueue.En_Queue(data, tenantId); //开启导入消息队列,将导入消息插入到缓存消息队列头部
                if (!enQueueResult)
                {
                    _loggging.Error("导入数据 In_GroupQueue失败:userId:" + userId + " tenantId:" + tenantId);
                    ImportGroupQueue.QueueUnLock(); //无论是否能执行入队,都要给缓存队列解锁
                    return messageResult;
                }
                _loggging.Debug("当前入队消息的model为" + Common.Serialize.SerializeHelper.Serialize(data));
                _loggging.Debug("入队消息总数为" + ImportGroupQueue.RedisIncr("count"));
                ImportGroupQueue.QueueUnLock(); //无论是否能执行入队,都要给缓存队列解锁
            }
            else //若未开启分组,则任务进入原有队列,走以前的逻辑
            {
                var enQueueResult = ImportQueue.En_Queue(data, tenantId);  //开启导入消息队列,将导入消息插入到缓存消息队列头部
                if (!enQueueResult)
                {
                    _loggging.Error("导入数据 In_Queue失败:userId:" + userId + " tenantId:" + tenantId);
                    return messageResult;
                }
            }
            return messageResult;
        }

当然其中将消息入队用到了Redis的Lpush操作:

/// <summary>
        /// 开启导入消息队列,将导入消息插入到缓存消息队列头部
        /// </summary>
        /// <param name="model">导入请求的model</param>
        /// <param name="tenantId">租户id</param>
        /// <returns></returns>
        public static bool En_Queue(ImportRequestDataModel model, int tenantId)
        {
            try
            {
                if (model != null)
                {
                    const string key = ImportQueueRedisKey;  //设置redis导入队列的key值
                    var jsonValue = Common.Serialize.SerializeHelper.Serialize(model);
                    RedisSet_LPush(key, jsonValue);
                    return true;
                }
                else
                {
                    return false;
                }
            }
            catch (Exception ex)
            {
                Loggging.Error("租户:" + tenantId + ", Enqueue失败,ImportRequestDataModel 数据:" + Common.Serialize.SerializeHelper.Serialize(model), ex);
                return false;
            }
        }

当然为了统计不同机器入队的消息总和,使用Redis的自增来进行监控:

/// <summary>
        /// Redis自增计数
        /// </summary>
        /// <param name="key"></param>
        public static long RedisIncr(string key)
        {
            //申请成功标志
            const int tenantId = TenantIdInRedis;
            try
            {
                using (var redis = new RedisNativeProviderV2(KeySpaceInRedis, tenantId))
                {
                    //redis中申请key-value
                    return redis.Incr(key);
                }
            }
            catch (Exception ex)
            {
                //进行创建异常的操作
                Loggging.Error($"在redis Incr[{key}]异常", ex);
            }
            finally
            {
                //进行创建成功或失败的操作
                Loggging.Info($"在redis LPush[{key}]");
            }
            return 0;
        }

消息处理

针对分组消息,我们在对分组消息做处理的时候的核心逻辑就是:每个分组作为一个唯一标识其实可以作为一个锁的编码,我把它称之为分组锁,只有持有分组锁的线程才能对分组消息做处理,这样就保证了同一分组的消息被串行的按照队列的先后顺序处理,而不同分组的消息还是被并行处理,对性能没有太大的影响。基于此,每个线程都需要遍历分组队列的数据,获取到后尝试获取每个任务的分组锁,如果能获取到则将任务出队,执行该任务并且给该分组上锁,当然在这个过程中,为了防止别的线程也同时放完队列,造成并发操作,我们还需要给队列上一个队列锁。具体的实现逻辑如下图所示:

整体的执行流程如下:

  1. 分组线程每隔2秒尝试获取分组消息队列锁(也就是控制权),获取之后遍历队列尝试获取分组锁(分组的控制权)
  2. 如果获取成功,则开始操作队列里的数据,从右向左遍历导入消息的model,每拿到一个model都获取其分组信息,如果该分组已被上锁,则说明相同分组的消息正在被其它线程处理,则去寻找下一个
  3. 如果该线程找到了未被上分组锁的消息,则将该导入消息序列化存储到以自己的线程编码为key的缓存里(如果线程意外关闭,下次重新启动的时候还能从缓存里重新获取到任务继续执行)并且给该分组上锁,防止同一分组的消息被其它线程处理,将该分组消息出队,然后释放队列锁,开始处理该导入消息,消息处理完成后,将自己的线程编码缓存设置为空,释放该分组锁,最后休眠2秒后继续尝试获取该队列的控制权。
  4. 如果该线程没有找到未被上分组锁的消息,释放队列锁并且在休眠2秒后继续尝试获取队列的控制权
  5. 如果线程在执行过程中关闭,再次重启的时候,会先从自己的线程编码缓存里尝试获取导入消息model,如果获取到则说明上次没有处理完,则需要继续处理,当然继续处理前需要确认当前线程是否持有该消息所属分组的分组锁,只有持有分组锁才能执行操作,否则只能等待并尝试获取分组锁,直到获取成功后执行。如果获取不到则证明重启前任务已执行完,则继续执行下边的逻辑从分组消息队列获取消息处理即可,走1-4部分的逻辑

这样保证了即使线程意外关闭不丢消息,线程任务同组串行执行,不同组并行执行的过程。整段执行流程的代码如下:

/// <summary>
        /// 单个处理导入数据的线程方法(分组的写法)
        /// </summary>
        /// <param name="queueIndexObj"></param>
        private void DealImportDataInGroupQueue(object queueIndexObj)
        {
            var queueIndex = Convert.ToInt32(queueIndexObj);
            ImportRequestDataModel data = null;
            try
            {
                data = ImportGroupQueue.GetLastTimeThreadUnDealData(queueIndex);//获取上次线程的导入数据
                if (data != null)  //如果data不为null
                {
                    var groupKey = ImportParallelismHelper.GetMessageGroupId(data.MetaObjName, data.TenantId);
                    while (true)
                    {
                        if (ImportGroupQueue.GetGroupLock(groupKey))  //如果可以获取到该分组的锁则处理残留数据
                        {
                            break;
                        }
                        System.Threading.Thread.Sleep(800); //先设置,每次处理完之后,休眠800毫秒,保证不过于频繁的请求redis造成redis压力
                    }
                    _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "持有分组锁:" + groupKey + "该分组锁的实体编码为:" + data.AppName + "租户id为:" + data.TenantId);
                    _loggging.Info("处理上次退出时留在redis中的数据。线程:" + queueIndex + " data:" + Common.Serialize.SerializeHelper.Serialize(data));
                    ImportData(data, queueIndex);
                    ImportGroupQueue.ClearCurrentDataModelInRedis(queueIndex);
                    ImportGroupQueue.GroupUnLock(data);  //参数数据处理完毕后,释放该锁
                    _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "释放了分组锁:" + groupKey + "该分组锁的实体编码为:" + data.AppName + "租户id为:" + data.TenantId);
                }
            }
            catch (Exception ex)
            {
                _loggging.Error("处理上次退出时留在redis中的数据发生异常:线程:" + queueIndex + " data:" + Common.Serialize.SerializeHelper.Serialize(data), ex);
            }
            _loggging.Debug("队列长度为" + ImportGroupQueue.GetRedisQueueLength());
            while (true)
            {
                if (ImportGroupQueue.GetRedisQueueLength() > 0)
                {
                    _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "试图访问队列");
                    if (ImportGroupQueue.GetQueueLock())
                    {
                        _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "访问队列成功");
                        _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "持有队列锁:QueueLock");
                        data = ImportGroupQueue.De_Queue(queueIndex);
                        ImportGroupQueue.QueueUnLock(); //无论取到的data是否为空,都要给缓存队列解锁
                        _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "释放了队列锁:QueueLock");
                        if (data != null)
                        {
                            ImportData(data, queueIndex);
                            _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "对分组" + ImportParallelismHelper.GetMessageGroupId(data.MetaObjName, data.TenantId) + "的导入任务执行完成");
                            ImportGroupQueue.ClearCurrentDataModelInRedis(queueIndex); //清空当前缓存
                            ImportGroupQueue.GroupUnLock(data); //给该分组解锁
                            _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "释放了分组锁:" + ImportParallelismHelper.GetMessageGroupId(data.MetaObjName, data.TenantId) + "该分组锁的实体编码为:" + data.AppName + "租户id为:" + data.TenantId);
                        }
                    }
                }
                System.Threading.Thread.Sleep(2000);//先设置,每次处理完之后,休眠2秒钟在,保证不过于频繁的请求redis造成redis压力
            }
        }

其中较为核心的遍历队列获取分组消息的内容如下:

/// <summary>
        ///将导入消息从总的缓存队列尾部取出并存储到单个机器上的缓存队列中
        /// </summary>
        /// <param name="threadIndex"></param>
        /// <returns></returns>
        public static ImportRequestDataModel De_Queue(int threadIndex)
        {
            //先从redis中寻找自己线程对应的数据,如果有,这说明还存在上次服务停止前没有处理完的数据,先处理这些数据。如果没有,则去获取redis 队列中的数据处理
            ImportRequestDataModel model = null;
            const string rkey = ImportQueueRedisKey;       
            Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "开启对队列内容的检索,寻找未添加分组锁的导入消息");
            var queueLen = RedisGetLength(rkey); //获取redis队列长度
            for (var i = queueLen - 1; i >= 0; i--)  //从队列右边开始倒序获取model
            {
                model = RedisLIndex(ImportQueueRedisKey, i);  //获取该model
                var key = ThreadRedisKeyPrefix + threadIndex;  //当前机器当前线程的缓存
                if (model != null)
                {
                    var jsonValue = Common.Serialize.SerializeHelper.Serialize(model);
                    Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "当前判定的model为:" + jsonValue + "其队列顺序为:" + i);
                    //获取分组名并先从锁缓存里判断是否存在该分组的导入消息
                    var groupKey = ImportParallelismHelper.GetMessageGroupId(model.MetaObjName, model.TenantId);
                    Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "尝试获取分组锁:" + groupKey + "该分组锁的实体编码为:" + model.AppName + "租户id为:" + model.TenantId);
                    //如果为true表明分组表里不存在该分组,设置锁并执行该分组的任务
                    if (GetGroupLock(groupKey))
                    {
                        Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "持有分组锁:" + groupKey + "该分组锁的实体编码为:" + model.AppName + "租户id为:" + model.TenantId);
                        //设置该导入任务到单台机器线程的redis里
                        RedisSetOne(key, jsonValue);
                        Loggging.Debug("当前出队model序列化为" + jsonValue + "其队列顺序为:" + i);
                        RedisLRem(jsonValue); //从消息队列里移除该元素
                        Loggging.Debug("队列长度为" + RedisGetLength(rkey));
                        break; //找到可以处理的元素后就跳出循环
                    }
                    else //如果没有获取到该分组的锁,则该model不需要出队,设置返回的model为null并且设置当前线程空置
                    {
                        model = null;
                        RedisSetOne(key, "");
                        Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "获取分组锁:" + groupKey + "失败");
                    }
                }
                else
                {
                    RedisSetOne(key, "");
                }
            }
            return model;
        }

我们都知道Redis是单线程的,所以天然不存在并发问题,name我们来看看如何通过Redis来实现队列锁和分组锁。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
NoSQL 安全 测试技术
Redis游戏积分排行榜项目中通义灵码的应用实战
Redis游戏积分排行榜项目中通义灵码的应用实战
102 4
|
2月前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
254 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
9天前
|
缓存 NoSQL 中间件
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
Redis,分布式缓存演化之路
|
3天前
|
人工智能 Kubernetes 异构计算
大道至简-基于ACK的Deepseek满血版分布式推理部署实战
本教程演示如何在ACK中多机分布式部署DeepSeek R1满血版。
|
25天前
|
存储 缓存 Java
Java中的分布式缓存与Memcached集成实战
通过在Java项目中集成Memcached,可以显著提升系统的性能和响应速度。合理的缓存策略、分布式架构设计和异常处理机制是实现高效缓存的关键。希望本文提供的实战示例和优化建议能够帮助开发者更好地应用Memcached,实现高性能的分布式缓存解决方案。
38 9
|
2月前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
236 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
2月前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
82 10
|
2月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
210 5
|
3月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
101 8
|
3月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
83 16