redis分布式锁小试

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 一、场景   项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本])。

一、场景

  项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本])。那么问题来了,mq中加入包含了两个部署消息 dm1 和 dm2,dm2的push_seq > dm1的push_seq,在分布式的情况下,dm1 和 dm2可能会分别被消费(也就是并行),那么在同步数据库的时候可能会发生 dm1 的数据保存 后于 dm2的数据保存,导致保存项目的部署信息发生异常。

二、解决思路

  将mq消息的并行消费变成串行消费,这里借助redis分布式锁来完成。同一个服务在分布式的状态下,监听到mq消息后,触发方法的执行,执行之前(通过spring aop around来做的)首先获得redis的一个分布式锁,获取锁成功之后才能执行相关的逻辑以及数据库的保存,最后释放锁。

三、主要代码

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* @author: hujunzheng
* @create: 17/9/29 下午2:49
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {
    /**
     * redis的key
     * @return
     */
    String value();
    /**
     * 持锁时间,单位毫秒,默认一分钟
     */
    long keepMills() default 60000;
    /**
     * 当获取失败时候动作
     */
    LockFailAction action() default LockFailAction.GIVEUP;
    
    public enum LockFailAction{
        /**
         * 放弃
         */
        GIVEUP,
        /**
         * 继续
         */
        CONTINUE;
    }
    /**
     * 睡眠时间,设置GIVEUP忽略此项
     * @return
     */
    long sleepMills() default 500;
}

 

import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @author: hujunzheng
* @create: 17/9/29 下午2:49
*/
@Component
@Aspect
public class RedisLockAspect {
    private static final Log log = LogFactory.getLog(RedisLockAspect.class);

    @Autowired
    private RedisCacheTemplate.RedisLockOperation redisLockOperation;

    @Pointcut("execution(* com.hjzgg..StargateDeployMessageConsumer.consumeStargateDeployMessage(..))" +
            "&& @annotation(me.ele.api.portal.service.redis.RedisLock)")
    private void lockPoint(){}

    @Around("lockPoint()")
    public Object arround(ProceedingJoinPoint pjp) throws Throwable{
        MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
        Method method = methodSignature.getMethod();
        RedisLock lockInfo = method.getAnnotation(RedisLock.class);
        
     /*
         String lockKey = lockInfo.value();
if (method.getParameters().length == 1 && pjp.getArgs()[0] instanceof DeployMessage) {
DeployMessage deployMessage = (DeployMessage) pjp.getArgs()[0];
lockKey += deployMessage.getEnv();
System.out.println(lockKey);
}
     */
        boolean lock = false;
        Object obj = null;
        while(!lock){
            long timestamp = System.currentTimeMillis()+lockInfo.keepMills();
            lock = setNX(lockInfo.value(), timestamp);
            //得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争)
            long now = System.currentTimeMillis();
            if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){
                log.info("得到redis分布式锁...");
                obj = pjp.proceed();
                if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){
                    releaseLock(lockInfo.value());
                }
            }else{
                if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){
                    log.info("稍后重新请求redis分布式锁...");
                    Thread.currentThread().sleep(lockInfo.sleepMills());
                }else{
                    log.info("放弃redis分布式锁...");
                    break;
                }
            }
        }
        return obj;
    }
    private boolean setNX(String key,Long value){
        return redisLockOperation.setNX(key, value);
    }
    private long getLock(String key){
        return redisLockOperation.get(key);
    }
    private Long getSet(String key,Long value){
        return redisLockOperation.getSet(key, value);
    }
    private void releaseLock(String key){
        redisLockOperation.delete(key);
    }
}

四、遇到的问题

  

  开始是将锁加到deploy的方法上的,但是一直aop一直没有作用,换到consumeStargateDeployMessage方法上就可以了。考虑了一下是因为 @Transactional的原因。这里注意下。

   在一篇文章中找到了原因:SpringBoot CGLIB AOP解决Spring事务,对象调用自己方法事务失效.

  只要脱离了Spring容器管理的所有对象,对于SpringAOP的注解都会失效,因为他们不是Spring容器的代理类,SpringAOP,就切入不了。也就是说是 @Transactional注解方法的代理对象并不是spring代理对象。

  参考: 关于proxy模式下,@Transactional标签在创建代理对象时的应用

五、使用spring-redis中的RedisLockRegistry

import java.util.concurrent.locks.Lock;
import org.springframework.integration.redis.util.RedisLockRegistry;

@Bean
public RedisLockRegistry redisLockRegistry(@Value("${xxx.xxxx.registry}") String redisRegistryKey,
                                           RedisTemplate redisTemplate) {
    return new RedisLockRegistry(redisTemplate.getConnectionFactory(), redisRegistryKey, 200000);
}

Lock lock = redisLockRegistry.obtain(appId);

lock.tryLock(180, TimeUnit.SECONDS);
....
lock.unlock();  

六、参考

  其他工具类,请参考这里

七、springboot LockRegistry

  

      分布式锁-RedisLockRegistry源码分析[转]

 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
import redis.clients.jedis.JedisShardInfo;


@Ignore
public class RedisLockTest {

  private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockTest.class);
  private static final String LOCK = "xxx.xxx";
  private RedisLockRegistry redisLockRegistry;

  @Before
  public void setUp() {
    JedisShardInfo shardInfo = new JedisShardInfo("127.0.0.1");
    JedisConnectionFactory factory = new JedisConnectionFactory(shardInfo);
    redisLockRegistry = new RedisLockRegistry(factory, "test", 50L);
  }

  private class TaskA implements Runnable {

    @Override
    public void run() {
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      Lock lock = redisLockRegistry.obtain(LOCK);
      try {
        lock.lock();
        LOGGER.info("Lock {} is obtained", lock);
        Thread.sleep(10);
        lock.unlock();
        LOGGER.info("Lock {} is unlocked", lock);
      } catch (Exception ex) {
        LOGGER.error("Lock {} unlock failed", lock, ex);
      }
    }
  }

  private class TimeoutTask implements Runnable {

    @Override
    public void run() {
      Lock lock = redisLockRegistry.obtain(LOCK);
      try {
        lock.lock();
        LOGGER.info("Lock {} is obtained", lock);
        Thread.sleep(5000);
        lock.unlock();
        LOGGER.info("Lock {} is unlocked", lock);
      } catch (Exception ex) {
        LOGGER.error("Lock {} unlock failed", lock, ex);
      }
    }
  }

  @Test
  public void test() throws InterruptedException, TimeoutException {
    ExecutorService service = Executors.newFixedThreadPool(2);
    service.execute(new TimeoutTask());
    service.execute(new TaskA());
    service.shutdown();
    if (!service.awaitTermination(1, TimeUnit.MINUTES)) {
      throw new TimeoutException();
    }
  }
}

 

 

 

目录
相关文章
|
3月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
260 6
|
4月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
2月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
211 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
2月前
|
缓存 NoSQL 关系型数据库
Redis缓存和分布式锁
Redis 是一种高性能的键值存储系统,广泛用于缓存、消息队列和内存数据库。其典型应用包括缓解关系型数据库压力,通过缓存热点数据提高查询效率,支持高并发访问。此外,Redis 还可用于实现分布式锁,解决分布式系统中的资源竞争问题。文章还探讨了缓存的更新策略、缓存穿透与雪崩的解决方案,以及 Redlock 算法等关键技术。
|
4月前
|
NoSQL Redis
Lua脚本协助Redis分布式锁实现命令的原子性
利用Lua脚本确保Redis操作的原子性是分布式锁安全性的关键所在,可以大幅减少由于网络分区、客户端故障等导致的锁无法正确释放的情况,从而在分布式系统中保证数据操作的安全性和一致性。在将这些概念应用于生产环境前,建议深入理解Redis事务与Lua脚本的工作原理以及分布式锁的可能问题和解决方案。
205 8
|
9月前
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
1246 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
5月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
1586 7
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
9月前
|
NoSQL Java Redis
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
923 83
|
6月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
516 3