基于redis和shedlock实现分布式锁(超简单)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 基于redis和shedlock实现分布式锁(超简单)

一、背景



线上部署了两台服务器,通过nginx轮询的方式进行负载均衡。但是这样存在一个问题同一个用户的session共享问题。你或许会说,使用ipHash模式就可以解决session共享的问题,是的确实可以解决这个问题,但是同样也会带来另外一个问题,就是一台服务器很繁忙,另外一台服务器闲置的情况。所以为了避免服务器闲置的现象,我们采用了ip轮询和共享session存入redis的解决方案。不过今天要讲的主题不是这个,我们在单机测试的时候完全运行正常,但是部署了到正式的环境的时候出现了客户支付金额对应不上的情况。后来经过排查发现是并发问题,虽然单机做了并发处理,但是如果有多台服务器的时候就会出现服务器之间的并发问题。所以需要引入分布式锁,解决多服务器间的。


二、分布式锁的实现



1. jar包的引入


<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>net.javacrumbs.shedlock</groupId>
            <artifactId>shedlock-provider-redis-spring</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.0</version>
        </dependency>
        <dependency>
            <groupId>net.javacrumbs.shedlock</groupId>
            <artifactId>shedlock-spring</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- swagger -->
        <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui -->
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>swagger-bootstrap-ui</artifactId>
            <version>1.9.6</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.9.2</version>
        </dependency>


2. redis的配置


  1. 配置文件
#redis
redis.host=192.168.1.6
redis.password=
redis.port=6379
redis.taskScheduler.poolSize=100
redis.taskScheduler.defaultLockMaxDurationMinutes=10
redis.default.timeout=10
redisCache.expireTimeInMilliseconds=1200000


  1. 配置类
package com.example.redis_demo_limit.redis;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.provider.redis.spring.RedisLockProvider;
import net.javacrumbs.shedlock.spring.ScheduledLockConfiguration;
import net.javacrumbs.shedlock.spring.ScheduledLockConfigurationBuilder;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import java.time.Duration;
@Configuration
public class RedisConfig {
    @Value("${redis.host}")
    private String redisHost;
    @Value("${redis.port}")
    private int redisPort;
    @Value("${redis.password}")
    private String password;
    @Value("${redis.taskScheduler.poolSize}")
    private int tasksPoolSize;
    @Value("${redis.taskScheduler.defaultLockMaxDurationMinutes}")
    private int lockMaxDuration;
    @Bean(destroyMethod = "shutdown")
    ClientResources clientResources() {
        return DefaultClientResources.create();
    }
    @Bean
    public RedisStandaloneConfiguration redisStandaloneConfiguration() {
        RedisStandaloneConfiguration redisStandaloneConfiguration =
                new RedisStandaloneConfiguration(redisHost, redisPort);
        if (password != null && !password.trim().equals("")) {
            RedisPassword redisPassword = RedisPassword.of(password);
            redisStandaloneConfiguration.setPassword(redisPassword);
        }
        return redisStandaloneConfiguration;
    }
    @Bean
    public ClientOptions clientOptions() {
        return ClientOptions.builder()
                .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
                .autoReconnect(true).build();
    }
    @Bean
    LettucePoolingClientConfiguration lettucePoolConfig(ClientOptions options, ClientResources dcr) {
        return LettucePoolingClientConfiguration.builder().poolConfig(new GenericObjectPoolConfig())
                .clientOptions(options).clientResources(dcr).build();
    }
    @Bean
    public RedisConnectionFactory connectionFactory(
            RedisStandaloneConfiguration redisStandaloneConfiguration,
            LettucePoolingClientConfiguration lettucePoolConfig) {
        return new LettuceConnectionFactory(redisStandaloneConfiguration, lettucePoolConfig);
    }
    @Bean
    @ConditionalOnMissingBean(name = "redisTemplate")
    @Primary
    public RedisTemplate<Object, Object> redisTemplate(
            RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
    @Bean
    public LockProvider lockProvider(RedisConnectionFactory connectionFactory) {
        return new RedisLockProvider(connectionFactory);
    }
    @Bean
    public ScheduledLockConfiguration taskSchedulerLocker(LockProvider lockProvider) {
        return ScheduledLockConfigurationBuilder.withLockProvider(lockProvider)
                .withPoolSize(tasksPoolSize).withDefaultLockAtMostFor(Duration.ofMinutes(lockMaxDuration))
                .build();
    }
}


  1. 操作类
package com.example.redis_demo_limit.redis;
public interface DataCacheRepository<T> {
  boolean add(String collection, String hkey, T object, Long timeout);
  boolean delete(String collection, String hkey);
  T find(String collection, String hkey, Class<T> tClass);
  Boolean isAvailable();
  /**
   * redis 加锁
   * 
   * @param key
   * @param second
   * @return
   */
  Boolean lock(String key, String value, Long second);
  Object getValue(String key);
  /**
   * redis 解锁
   * 
   * @param key
   * @return
   */
  void unLock(String key);
  void setIfAbsent(String key, long value, long ttl);
  void increment(String key);
  Long get(String key);
  void set(String key, long value, long ttl);
  void set(Object key, Object value, long ttl);
  Object getByKey(String key);
  void getLock(String key, String clientID) throws Exception;
  void releaseLock(String key, String clientID);
  boolean hasKey(String key);
}


实现类

package com.example.redis_demo_limit.redis;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.support.atomic.RedisAtomicLong;
import org.springframework.stereotype.Repository;
import java.time.Duration;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
@Slf4j
@Repository
public class CacheRepository<T> implements com.example.redis_demo_limit.redis.DataCacheRepository<T> {
  private static final ObjectMapper OBJECT_MAPPER;
  private static final TimeZone DEFAULT_TIMEZONE = TimeZone.getTimeZone("UTC");
  static {
    OBJECT_MAPPER = new ObjectMapper();
    OBJECT_MAPPER.setTimeZone(DEFAULT_TIMEZONE);
  }
  Logger logger = LoggerFactory.getLogger(CacheRepository.class);
  @Autowired
  RedisTemplate template; // and we're in business
  @Value("${redis.default.timeout}00")
  Long defaultTimeOut;
  public boolean addPermentValue(String collection, String hkey, T object) {
    try {
      String jsonObject = OBJECT_MAPPER.writeValueAsString(object);
      template.opsForHash().put(collection, hkey, jsonObject);
      return true;
    } catch (Exception e) {
      logger.error("Unable to add object of key {} to cache collection '{}': {}", hkey, collection,
          e.getMessage());
      return false;
    }
  }
  @Override
  public boolean add(String collection, String hkey, T object, Long timeout) {
    Long localTimeout;
    if (timeout == null) {
      localTimeout = defaultTimeOut;
    } else {
      localTimeout = timeout;
    }
    try {
      String jsonObject = OBJECT_MAPPER.writeValueAsString(object);
      template.opsForHash().put(collection, hkey, jsonObject);
      template.expire(collection, localTimeout, TimeUnit.SECONDS);
      return true;
    } catch (Exception e) {
      logger.error("Unable to add object of key {} to cache collection '{}': {}", hkey, collection,
          e.getMessage());
      return false;
    }
  }
  @Override
  public boolean delete(String collection, String hkey) {
    try {
      template.opsForHash().delete(collection, hkey);
      return true;
    } catch (Exception e) {
      logger.error("Unable to delete entry {} from cache collection '{}': {}", hkey, collection,
          e.getMessage());
      return false;
    }
  }
  @Override
  public T find(String collection, String hkey, Class<T> tClass) {
    try {
      String jsonObj = String.valueOf(template.opsForHash().get(collection, hkey));
      return OBJECT_MAPPER.readValue(jsonObj, tClass);
    } catch (Exception e) {
      if (e.getMessage() == null) {
        logger.error("Entry '{}' does not exist in cache", hkey);
      } else {
        logger.error("Unable to find entry '{}' in cache collection '{}': {}", hkey, collection,
            e.getMessage());
      }
      return null;
    }
  }
  @Override
  public Boolean isAvailable() {
    try {
      return template.getConnectionFactory().getConnection().ping() != null;
    } catch (Exception e) {
      logger.warn("Redis server is not available at the moment.");
    }
    return false;
  }
  @Override
  public Boolean lock(String key, String value, Long second) {
    Boolean absent = template.opsForValue().setIfAbsent(key, value, second, TimeUnit.SECONDS);
    return absent;
  }
  @Override
  public Object getValue(String key) {
    return template.opsForValue().get(key);
  }
  @Override
  public void unLock(String key) {
    template.delete(key);
  }
  @Override
  public void increment(String key) {
    RedisAtomicLong counter = new RedisAtomicLong(key, template.getConnectionFactory());
    counter.incrementAndGet();
  }
  @Override
  public void setIfAbsent(String key, long value, long ttl) {
    ValueOperations<String, Object> ops = template.opsForValue();
    ops.setIfAbsent(key, value, Duration.ofSeconds(ttl));
  }
  @Override
  public Long get(String key) {
    RedisAtomicLong counter = new RedisAtomicLong(key, template.getConnectionFactory());
    return counter.get();
  }
  @Override
  public void set(String key, long value, long ttl) {
    RedisAtomicLong counter = new RedisAtomicLong(key, template.getConnectionFactory());
    counter.set(value);
    counter.expire(ttl, TimeUnit.SECONDS);
  }
  @Override
  public void set(Object key, Object value, long ttl) {
    template.opsForValue().set(key, value, ttl, TimeUnit.SECONDS);
  }
  @Override
  public Object getByKey(String key) {
    return template.opsForValue().get(key);
  }
  @Override
  public void getLock(String key, String clientID) throws Exception {
    Boolean lock = false;
    // 重试3次,每间隔1秒重试1次
    for (int j = 0; j <= 3; j++) {
      lock = lock(key, clientID, 10L);
      if (lock) {
        log.info("获得锁》》》" + key);
        break;
      }
      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        log.error("线程休眠异常", e);
        break;
      }
    }
    // 重试3次依然没有获取到锁,那么返回服务器繁忙,请稍后重试
    if (!lock) {
      throw new Exception("服务繁忙");
    }
  }
  @Override
  public void releaseLock(String key, String clientID) {
    if (clientID.equals(getByKey(key))) {
      unLock(key);
    }
  }
  @Override
  public boolean hasKey(String key) {
    return template.hasKey(key);
  }
}


三、使用方法



import com.example.redis_demo_limit.annotation.LimitedAccess;
import com.example.redis_demo_limit.redis.DataCacheRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.UUID;
@Slf4j
@RestController
@RequestMapping("/redis")
public class RedisController {
    private static final String KEY = "key";
    @Resource
    private DataCacheRepository dataCacheRepository;
    @LimitedAccess(frequency = 1,second = 1)
    @PostMapping("/add")
    public String add(String str){
        dataCacheRepository.set("str","add success",200L);
        return "success";
    }
    //分布式锁使用示例
    @PostMapping("/pay")
    public String pay(String userName,Integer account){
        String clientID = UUID.randomUUID().toString();
        //设置锁的过期时间,避免死锁
        Boolean lock = dataCacheRepository.lock(userName, clientID, 6000L);
        if(!lock){
            log.info("未获取到锁{}", userName);
            return "程序繁忙,请稍后再试!";
        }
        try {
            //等待5s,方便测试
            Thread.sleep(5000);
            if(dataCacheRepository.hasKey(KEY)){
                Long aLong = dataCacheRepository.get(KEY);
                dataCacheRepository.set(KEY,aLong+account,-1);
                return account+aLong+"";
            }else {
                dataCacheRepository.set(KEY,account,-1);
                return account+"";
            }
        } catch (InterruptedException e) {
            log.error(e.getMessage(),e);
            return "程序运行异常,请联系管理员!";
        } finally {
            if (clientID.equals(dataCacheRepository.getByKey(userName))) {
                log.info("finally删除锁{}", userName);
                dataCacheRepository.unLock(userName);
            }
        }
    }
}


四、结果测试



  1. 先在8082点击pay

640.png


  1. 再在8081点击pay
  2. 640.png

  3. 从结果来看,分布式锁起到了作用。很简单吧,还不来试一下!


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
12天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
47 16
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
61 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
1月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
48 1
|
1月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
78 4
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
56 3
|
1月前
|
缓存 NoSQL 算法
面试题:Redis如何实现分布式锁!
面试题:Redis如何实现分布式锁!
|
机器学习/深度学习 缓存 NoSQL
|
缓存 NoSQL Java
为什么分布式一定要有redis?
1、为什么使用redis 分析:博主觉得在项目中使用redis,主要是从两个角度去考虑:性能和并发。当然,redis还具备可以做分布式锁等其他功能,但是如果只是为了分布式锁这些其他功能,完全还有其他中间件(如zookpeer等)代替,并不是非要使用redis。
1364 0
|
1月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
77 6
下一篇
无影云桌面