技术特性
在以上的实现逻辑中,用到了Redis的几个经典的特性:队列锁、分组锁、指定元素出队
队列锁
队列锁即对队列的控制权,我们可以设定一个约定好的常量值即可,使用Redis的特性SetNx操作,天然的原子操作:只在键 key 不存在的情况下, 将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作,需要注意的是队列锁需要设置过期时间,如果线程意外关闭没有来得及释放队列锁,会导致死锁。
/// <summary> /// 获取缓存队列的队列锁 /// </summary> /// <returns></returns> public static bool GetQueueLock() { //申请成功标志 const string queueLockkey = "QueueLock"; const int tenantId = TenantIdInRedis; try { using (var redis = new RedisNativeProviderV2(KeySpaceInRedis, tenantId)) { //redis中,如果返回true设置成功代表队列锁空闲,如果返回false设置失败表明队列锁正在被持有 if (RedisSetNx(queueLockkey)) //如果为true,设置队列锁并设置该锁的过期时间 { RedisExpire(queueLockkey, 600);//设置过期时间为10分钟 return true; } } } catch (Exception ex) { //进行查询异常操作 Loggging.Error($"在redis 设置队列锁[{queueLockkey}]异常", ex); //抛出异常 } return false; } /// <summary> /// 给缓存队列的队列解锁 /// </summary> /// <returns></returns> public static bool QueueUnLock() { //申请成功标志 const string queueLockkey = "QueueLock"; const int tenantId = TenantIdInRedis; try { using (var redis = new RedisNativeProviderV2(KeySpaceInRedis, tenantId)) { return RedisDeleteKey(queueLockkey); //redis中,如果返回true设置成功代表原来不存在这样的分组,如果返回false设置失败表明原来存在这样的分组 } } catch (Exception ex) { //进行查询异常操作 Loggging.Error($"在redis 解除队列锁[{queueLockkey}]异常", ex); //抛出异常 } return false; }
SetNx的执代码如下:
/// <summary> /// 如果返回true设置成功代表原来不存在这样的锁,如果返回false设置失败表明原来存在这样的锁 /// </summary> /// <param name="key"></param> /// <returns></returns> public static bool RedisSetNx(string key) { const int tenantId = TenantIdInRedis; try { using (var redis = new RedisNativeProviderV2(KeySpaceInRedis, tenantId)) { return redis.SetNx(key, StringToBytes(key)); } } catch (Exception ex) { //进行锁创建异常的操作 Loggging.Error($"在redis setNx[{key}]:[{key}]异常", ex); } finally { //进行锁创建成功或失败的操作 Loggging.Info($"在redis setNx[{key}]:[{key}]"); } return false; }
分组锁
同理,分组锁也是通过类似形式实现的,需要注意的是分组锁同样需要设置过期时间,如果线程意外关闭没有来得及释放分组锁,会导致死锁。
/// <summary> ///给分组加分组锁 /// </summary> /// <returns></returns> public static bool GetGroupLock(string groupKey) { const int tenantId = TenantIdInRedis; try { using (var redis = new RedisNativeProviderV2(KeySpaceInRedis, tenantId)) { //redis中,如果返回true设置成功代表分组锁空闲,如果返回false设置失败表明队分组锁正在被持有 if (RedisSetNx(groupKey)) //如果为true,设置分组锁并设置该锁的过期时间 { RedisExpire(groupKey, 300);//设置过期时间为5分钟 return true; } } } catch (Exception ex) { //进行查询异常操作 Loggging.Error($"在redis 设置分组锁[{groupKey}]异常", ex); //抛出异常 } return false; } /// <summary> /// 给分组锁解锁 /// </summary> /// <returns></returns> public static bool GroupUnLock(ImportRequestDataModel model) { //申请成功标志 const int tenantId = TenantIdInRedis; var groupKey = ImportParallelismHelper.GetMessageGroupId(model.MetaObjName, model.TenantId); try { using (var redis = new RedisNativeProviderV2(KeySpaceInRedis, tenantId)) { return RedisDeleteKey(groupKey); //redis中,如果返回true设置成功代表原来不存在这样的分组,如果返回false设置失败表明原来存在这样的分组 } } catch (Exception ex) { //进行查询异常操作 Loggging.Error($"在redis 解除分组锁[{groupKey}]异常", ex); //抛出异常 } return false; }
相同元素出队
这也算队列的一个难题,因为队列没有从中间出队的,搜索后才发现,其实可以通过相同元素出队的方式处理:
/// <summary> /// 从队尾开始删除第一个与model相同的元素 /// </summary> /// <param name="value"></param> private static void RedisLRem(string value) { //申请成功标志 const int tenantId = TenantIdInRedis; try { using (var redis = new RedisNativeProviderV2(KeySpaceInRedis, tenantId)) { //redis中,如果返回true设置成功代表原来不存在这样的分组,如果返回false设置失败表明原来存在这样的分组 redis.LRem(ImportQueueRedisKey, -1, StringToBytes(value)); } } catch (Exception ex) { //进行锁创建异常的操作 Loggging.Error($"在redis RedisLRem[{ImportQueueRedisKey}]异常", ex); } finally { //进行锁创建成功或失败的操作 Loggging.Info($"在redis RedisLRem[{ImportQueueRedisKey}]异常"); } }
感觉还是在实战中能更加深刻的理解理论,当然过程还是很痛苦的,核对方案,考虑各种失败的可能性,以及对Redis的理解,尤其困难的是分布式的代码很难调试,只能通过日志来模拟场景。总之算是一个较大的提升吧,在实践中训练之后要再从理论中获取更多知识更好的服务于实践吧!可以了解了解SetNX加过期时间的整体原子操作。