最近在公司做的一个功能是这样的,异步的方式进行导入任务处理也即:客户机生产消息,将消息通过RabbitMQ服务器分发到部署有消费代码的服务端机器进行并发处理,这样做保证了消息的异步处理,但是可能大消息量会对机器有较大的压力,所以所有通过RabbitMQ传递的消息在传递到消费端后首先要被放置到一个缓存队列里排队,然后每台机器再有条不紊的启用线程来消费队列里的消息。这样貌似很完美,但是在使用的过程中遇到了一个业务难题:
- 用户希望对这些到来的消息分组,并且在整体并发的前提下能控制相同分组的消息串行
- 用户并不希望会因为消息分组带来较大的性能损耗
基于以上两点业务问题,我使用Redis的分布式锁来解决。设定实体编码+租户id为分组编码,给每个导入消息生成一个分组编码,同一分组下的导入任务不能被同时执行,只能被串行化执行,不同分组的任务则不受此限制,通俗的说就是:总体任务仍然是并行执行,同一分组的任务则串行执行。这样势必会降低性能,所以为了保证代码隔离和性能稳定,需要开启一套新的处理消息分组的线程和一个专门用来填充分组导入消息的Redis消息队列,这样未开启导入串行化配置的应用和租户不会受到任何影响,依旧走之前的逻辑,而开启了串行化配置的应用和租户则有额外的线程去处理任务,这样性能即使会下降,影响也很有限。开销仅仅是开启两个线程和增加一个分组的Reids消息队列。
消息分发
当然第一步需要做的就是对消息进行分组,然后再开启一个存放分组消息的队列,读取到分组的消息就塞到分组的缓存队列里,这样消息分发的流程就变成了如下图所示:
当然为了兼容以前的逻辑,只有开启了配置(有分组需求的客户)才会使用分组消息队列,整体流程如下:
- 客户机进行操作,产生任务消息,发往RabbitMQ服务器
- RabbitMQ服务器将任务消息分发到各个消费端服务器
- 各个消费端服务器在接收到任务消息后区分是否需要对消息分组,如果需要读取其携带的附属信息,给每个任务消息生成一个分组编码,将该任务信息存放到分组消息缓存队列,如果不需要直接存放到原有的不需分组的消息缓存队列。
这样就完成了第一步,消息分组分发:分组消息通过Reids的Lpush操作依次进入分组消息队列中 ,当然入队的时候也是要上队列锁(下边会提到)的,系统线程尝试获取该队列锁,获取成功则执行入队操作,获取失败则休眠800毫秒后继续尝试获取,不管消息入队成功与否,都要释放队列锁,防止导入任务执行受影响。这一段代码的处理流程如下:
public MessageResult TaskRoute(ImportRequestDataModel data, MessageResult messageResult) { var tenantId = data.TenantId; var userId = data.UserId; //如果该任务属于开启了分组的任务则进入分组消息队列 if (ImportParallelismHelper.IsEnableImportParallelism(data.AppName, data.TenantId)) { while (true) { if (ImportGroupQueue.GetQueueLock()) { break; } System.Threading.Thread.Sleep(800); //先设置,每次处理完之后,休眠800毫秒,保证不过于频繁的请求redis造成redis压力 } var enQueueResult = ImportGroupQueue.En_Queue(data, tenantId); //开启导入消息队列,将导入消息插入到缓存消息队列头部 if (!enQueueResult) { _loggging.Error("导入数据 In_GroupQueue失败:userId:" + userId + " tenantId:" + tenantId); ImportGroupQueue.QueueUnLock(); //无论是否能执行入队,都要给缓存队列解锁 return messageResult; } _loggging.Debug("当前入队消息的model为" + Common.Serialize.SerializeHelper.Serialize(data)); _loggging.Debug("入队消息总数为" + ImportGroupQueue.RedisIncr("count")); ImportGroupQueue.QueueUnLock(); //无论是否能执行入队,都要给缓存队列解锁 } else //若未开启分组,则任务进入原有队列,走以前的逻辑 { var enQueueResult = ImportQueue.En_Queue(data, tenantId); //开启导入消息队列,将导入消息插入到缓存消息队列头部 if (!enQueueResult) { _loggging.Error("导入数据 In_Queue失败:userId:" + userId + " tenantId:" + tenantId); return messageResult; } } return messageResult; }
当然其中将消息入队用到了Redis的Lpush操作:
/// <summary> /// 开启导入消息队列,将导入消息插入到缓存消息队列头部 /// </summary> /// <param name="model">导入请求的model</param> /// <param name="tenantId">租户id</param> /// <returns></returns> public static bool En_Queue(ImportRequestDataModel model, int tenantId) { try { if (model != null) { const string key = ImportQueueRedisKey; //设置redis导入队列的key值 var jsonValue = Common.Serialize.SerializeHelper.Serialize(model); RedisSet_LPush(key, jsonValue); return true; } else { return false; } } catch (Exception ex) { Loggging.Error("租户:" + tenantId + ", Enqueue失败,ImportRequestDataModel 数据:" + Common.Serialize.SerializeHelper.Serialize(model), ex); return false; } }
当然为了统计不同机器入队的消息总和,使用Redis的自增来进行监控:
/// <summary> /// Redis自增计数 /// </summary> /// <param name="key"></param> public static long RedisIncr(string key) { //申请成功标志 const int tenantId = TenantIdInRedis; try { using (var redis = new RedisNativeProviderV2(KeySpaceInRedis, tenantId)) { //redis中申请key-value return redis.Incr(key); } } catch (Exception ex) { //进行创建异常的操作 Loggging.Error($"在redis Incr[{key}]异常", ex); } finally { //进行创建成功或失败的操作 Loggging.Info($"在redis LPush[{key}]"); } return 0; }
消息处理
针对分组消息,我们在对分组消息做处理的时候的核心逻辑就是:每个分组作为一个唯一标识其实可以作为一个锁的编码,我把它称之为分组锁,只有持有分组锁的线程才能对分组消息做处理,这样就保证了同一分组的消息被串行的按照队列的先后顺序处理,而不同分组的消息还是被并行处理,对性能没有太大的影响。基于此,每个线程都需要遍历分组队列的数据,获取到后尝试获取每个任务的分组锁,如果能获取到则将任务出队,执行该任务并且给该分组上锁,当然在这个过程中,为了防止别的线程也同时放完队列,造成并发操作,我们还需要给队列上一个队列锁。具体的实现逻辑如下图所示:
整体的执行流程如下:
- 分组线程每隔2秒尝试获取分组消息队列锁(也就是控制权),获取之后遍历队列尝试获取分组锁(分组的控制权)
- 如果获取成功,则开始操作队列里的数据,从右向左遍历导入消息的model,每拿到一个model都获取其分组信息,如果该分组已被上锁,则说明相同分组的消息正在被其它线程处理,则去寻找下一个
- 如果该线程找到了未被上分组锁的消息,则将该导入消息序列化存储到以自己的线程编码为key的缓存里(如果线程意外关闭,下次重新启动的时候还能从缓存里重新获取到任务继续执行)并且给该分组上锁,防止同一分组的消息被其它线程处理,将该分组消息出队,然后释放队列锁,开始处理该导入消息,消息处理完成后,将自己的线程编码缓存设置为空,释放该分组锁,最后休眠2秒后继续尝试获取该队列的控制权。
- 如果该线程没有找到未被上分组锁的消息,释放队列锁并且在休眠2秒后继续尝试获取队列的控制权
- 如果线程在执行过程中关闭,再次重启的时候,会先从自己的线程编码缓存里尝试获取导入消息model,如果获取到则说明上次没有处理完,则需要继续处理,当然继续处理前需要确认当前线程是否持有该消息所属分组的分组锁,只有持有分组锁才能执行操作,否则只能等待并尝试获取分组锁,直到获取成功后执行。如果获取不到则证明重启前任务已执行完,则继续执行下边的逻辑从分组消息队列获取消息处理即可,走1-4部分的逻辑
这样保证了即使线程意外关闭不丢消息,线程任务同组串行执行,不同组并行执行的过程。整段执行流程的代码如下:
/// <summary> /// 单个处理导入数据的线程方法(分组的写法) /// </summary> /// <param name="queueIndexObj"></param> private void DealImportDataInGroupQueue(object queueIndexObj) { var queueIndex = Convert.ToInt32(queueIndexObj); ImportRequestDataModel data = null; try { data = ImportGroupQueue.GetLastTimeThreadUnDealData(queueIndex);//获取上次线程的导入数据 if (data != null) //如果data不为null { var groupKey = ImportParallelismHelper.GetMessageGroupId(data.MetaObjName, data.TenantId); while (true) { if (ImportGroupQueue.GetGroupLock(groupKey)) //如果可以获取到该分组的锁则处理残留数据 { break; } System.Threading.Thread.Sleep(800); //先设置,每次处理完之后,休眠800毫秒,保证不过于频繁的请求redis造成redis压力 } _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "持有分组锁:" + groupKey + "该分组锁的实体编码为:" + data.AppName + "租户id为:" + data.TenantId); _loggging.Info("处理上次退出时留在redis中的数据。线程:" + queueIndex + " data:" + Common.Serialize.SerializeHelper.Serialize(data)); ImportData(data, queueIndex); ImportGroupQueue.ClearCurrentDataModelInRedis(queueIndex); ImportGroupQueue.GroupUnLock(data); //参数数据处理完毕后,释放该锁 _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "释放了分组锁:" + groupKey + "该分组锁的实体编码为:" + data.AppName + "租户id为:" + data.TenantId); } } catch (Exception ex) { _loggging.Error("处理上次退出时留在redis中的数据发生异常:线程:" + queueIndex + " data:" + Common.Serialize.SerializeHelper.Serialize(data), ex); } _loggging.Debug("队列长度为" + ImportGroupQueue.GetRedisQueueLength()); while (true) { if (ImportGroupQueue.GetRedisQueueLength() > 0) { _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "试图访问队列"); if (ImportGroupQueue.GetQueueLock()) { _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "访问队列成功"); _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "持有队列锁:QueueLock"); data = ImportGroupQueue.De_Queue(queueIndex); ImportGroupQueue.QueueUnLock(); //无论取到的data是否为空,都要给缓存队列解锁 _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "释放了队列锁:QueueLock"); if (data != null) { ImportData(data, queueIndex); _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "对分组" + ImportParallelismHelper.GetMessageGroupId(data.MetaObjName, data.TenantId) + "的导入任务执行完成"); ImportGroupQueue.ClearCurrentDataModelInRedis(queueIndex); //清空当前缓存 ImportGroupQueue.GroupUnLock(data); //给该分组解锁 _loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + queueIndex + "释放了分组锁:" + ImportParallelismHelper.GetMessageGroupId(data.MetaObjName, data.TenantId) + "该分组锁的实体编码为:" + data.AppName + "租户id为:" + data.TenantId); } } } System.Threading.Thread.Sleep(2000);//先设置,每次处理完之后,休眠2秒钟在,保证不过于频繁的请求redis造成redis压力 } }
其中较为核心的遍历队列获取分组消息的内容如下:
/// <summary> ///将导入消息从总的缓存队列尾部取出并存储到单个机器上的缓存队列中 /// </summary> /// <param name="threadIndex"></param> /// <returns></returns> public static ImportRequestDataModel De_Queue(int threadIndex) { //先从redis中寻找自己线程对应的数据,如果有,这说明还存在上次服务停止前没有处理完的数据,先处理这些数据。如果没有,则去获取redis 队列中的数据处理 ImportRequestDataModel model = null; const string rkey = ImportQueueRedisKey; Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "开启对队列内容的检索,寻找未添加分组锁的导入消息"); var queueLen = RedisGetLength(rkey); //获取redis队列长度 for (var i = queueLen - 1; i >= 0; i--) //从队列右边开始倒序获取model { model = RedisLIndex(ImportQueueRedisKey, i); //获取该model var key = ThreadRedisKeyPrefix + threadIndex; //当前机器当前线程的缓存 if (model != null) { var jsonValue = Common.Serialize.SerializeHelper.Serialize(model); Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "当前判定的model为:" + jsonValue + "其队列顺序为:" + i); //获取分组名并先从锁缓存里判断是否存在该分组的导入消息 var groupKey = ImportParallelismHelper.GetMessageGroupId(model.MetaObjName, model.TenantId); Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "尝试获取分组锁:" + groupKey + "该分组锁的实体编码为:" + model.AppName + "租户id为:" + model.TenantId); //如果为true表明分组表里不存在该分组,设置锁并执行该分组的任务 if (GetGroupLock(groupKey)) { Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "持有分组锁:" + groupKey + "该分组锁的实体编码为:" + model.AppName + "租户id为:" + model.TenantId); //设置该导入任务到单台机器线程的redis里 RedisSetOne(key, jsonValue); Loggging.Debug("当前出队model序列化为" + jsonValue + "其队列顺序为:" + i); RedisLRem(jsonValue); //从消息队列里移除该元素 Loggging.Debug("队列长度为" + RedisGetLength(rkey)); break; //找到可以处理的元素后就跳出循环 } else //如果没有获取到该分组的锁,则该model不需要出队,设置返回的model为null并且设置当前线程空置 { model = null; RedisSetOne(key, ""); Loggging.Debug("线程" + ImportGroupQueue.ThreadRedisKeyPrefix + threadIndex + "获取分组锁:" + groupKey + "失败"); } } else { RedisSetOne(key, ""); } } return model; }
我们都知道Redis是单线程的,所以天然不存在并发问题,name我们来看看如何通过Redis来实现队列锁和分组锁。