一、背景
线上部署了两台服务器,通过nginx轮询的方式进行负载均衡。但是这样存在一个问题同一个用户的session共享问题。你或许会说,使用ipHash模式就可以解决session共享的问题,是的确实可以解决这个问题,但是同样也会带来另外一个问题,就是一台服务器很繁忙,另外一台服务器闲置的情况。所以为了避免服务器闲置的现象,我们采用了ip轮询和共享session存入redis的解决方案。不过今天要讲的主题不是这个,我们在单机测试的时候完全运行正常,但是部署了到正式的环境的时候出现了客户支付金额对应不上的情况。后来经过排查发现是并发问题,虽然单机做了并发处理,但是如果有多台服务器的时候就会出现服务器之间的并发问题。所以需要引入分布式锁,解决多服务器间的。
二、分布式锁的实现
1. jar包的引入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>net.javacrumbs.shedlock</groupId> <artifactId>shedlock-provider-redis-spring</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.0</version> </dependency> <dependency> <groupId>net.javacrumbs.shedlock</groupId> <artifactId>shedlock-spring</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- swagger --> <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui --> <dependency> <groupId>com.github.xiaoymin</groupId> <artifactId>swagger-bootstrap-ui</artifactId> <version>1.9.6</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.9.2</version> </dependency>
2. redis的配置
- 配置文件
#redis redis.host=192.168.1.6 redis.password= redis.port=6379 redis.taskScheduler.poolSize=100 redis.taskScheduler.defaultLockMaxDurationMinutes=10 redis.default.timeout=10 redisCache.expireTimeInMilliseconds=1200000
- 配置类
package com.example.redis_demo_limit.redis; import io.lettuce.core.ClientOptions; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; import net.javacrumbs.shedlock.core.LockProvider; import net.javacrumbs.shedlock.provider.redis.spring.RedisLockProvider; import net.javacrumbs.shedlock.spring.ScheduledLockConfiguration; import net.javacrumbs.shedlock.spring.ScheduledLockConfigurationBuilder; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisPassword; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.core.RedisTemplate; import java.time.Duration; @Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port}") private int redisPort; @Value("${redis.password}") private String password; @Value("${redis.taskScheduler.poolSize}") private int tasksPoolSize; @Value("${redis.taskScheduler.defaultLockMaxDurationMinutes}") private int lockMaxDuration; @Bean(destroyMethod = "shutdown") ClientResources clientResources() { return DefaultClientResources.create(); } @Bean public RedisStandaloneConfiguration redisStandaloneConfiguration() { RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(redisHost, redisPort); if (password != null && !password.trim().equals("")) { RedisPassword redisPassword = RedisPassword.of(password); redisStandaloneConfiguration.setPassword(redisPassword); } return redisStandaloneConfiguration; } @Bean public ClientOptions clientOptions() { return ClientOptions.builder() .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) .autoReconnect(true).build(); } @Bean LettucePoolingClientConfiguration lettucePoolConfig(ClientOptions options, ClientResources dcr) { return LettucePoolingClientConfiguration.builder().poolConfig(new GenericObjectPoolConfig()) .clientOptions(options).clientResources(dcr).build(); } @Bean public RedisConnectionFactory connectionFactory( RedisStandaloneConfiguration redisStandaloneConfiguration, LettucePoolingClientConfiguration lettucePoolConfig) { return new LettuceConnectionFactory(redisStandaloneConfiguration, lettucePoolConfig); } @Bean @ConditionalOnMissingBean(name = "redisTemplate") @Primary public RedisTemplate<Object, Object> redisTemplate( RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); return template; } @Bean public LockProvider lockProvider(RedisConnectionFactory connectionFactory) { return new RedisLockProvider(connectionFactory); } @Bean public ScheduledLockConfiguration taskSchedulerLocker(LockProvider lockProvider) { return ScheduledLockConfigurationBuilder.withLockProvider(lockProvider) .withPoolSize(tasksPoolSize).withDefaultLockAtMostFor(Duration.ofMinutes(lockMaxDuration)) .build(); } }
- 操作类
package com.example.redis_demo_limit.redis; public interface DataCacheRepository<T> { boolean add(String collection, String hkey, T object, Long timeout); boolean delete(String collection, String hkey); T find(String collection, String hkey, Class<T> tClass); Boolean isAvailable(); /** * redis 加锁 * * @param key * @param second * @return */ Boolean lock(String key, String value, Long second); Object getValue(String key); /** * redis 解锁 * * @param key * @return */ void unLock(String key); void setIfAbsent(String key, long value, long ttl); void increment(String key); Long get(String key); void set(String key, long value, long ttl); void set(Object key, Object value, long ttl); Object getByKey(String key); void getLock(String key, String clientID) throws Exception; void releaseLock(String key, String clientID); boolean hasKey(String key); }
实现类
package com.example.redis_demo_limit.redis; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.data.redis.support.atomic.RedisAtomicLong; import org.springframework.stereotype.Repository; import java.time.Duration; import java.util.TimeZone; import java.util.concurrent.TimeUnit; @Slf4j @Repository public class CacheRepository<T> implements com.example.redis_demo_limit.redis.DataCacheRepository<T> { private static final ObjectMapper OBJECT_MAPPER; private static final TimeZone DEFAULT_TIMEZONE = TimeZone.getTimeZone("UTC"); static { OBJECT_MAPPER = new ObjectMapper(); OBJECT_MAPPER.setTimeZone(DEFAULT_TIMEZONE); } Logger logger = LoggerFactory.getLogger(CacheRepository.class); @Autowired RedisTemplate template; // and we're in business @Value("${redis.default.timeout}00") Long defaultTimeOut; public boolean addPermentValue(String collection, String hkey, T object) { try { String jsonObject = OBJECT_MAPPER.writeValueAsString(object); template.opsForHash().put(collection, hkey, jsonObject); return true; } catch (Exception e) { logger.error("Unable to add object of key {} to cache collection '{}': {}", hkey, collection, e.getMessage()); return false; } } @Override public boolean add(String collection, String hkey, T object, Long timeout) { Long localTimeout; if (timeout == null) { localTimeout = defaultTimeOut; } else { localTimeout = timeout; } try { String jsonObject = OBJECT_MAPPER.writeValueAsString(object); template.opsForHash().put(collection, hkey, jsonObject); template.expire(collection, localTimeout, TimeUnit.SECONDS); return true; } catch (Exception e) { logger.error("Unable to add object of key {} to cache collection '{}': {}", hkey, collection, e.getMessage()); return false; } } @Override public boolean delete(String collection, String hkey) { try { template.opsForHash().delete(collection, hkey); return true; } catch (Exception e) { logger.error("Unable to delete entry {} from cache collection '{}': {}", hkey, collection, e.getMessage()); return false; } } @Override public T find(String collection, String hkey, Class<T> tClass) { try { String jsonObj = String.valueOf(template.opsForHash().get(collection, hkey)); return OBJECT_MAPPER.readValue(jsonObj, tClass); } catch (Exception e) { if (e.getMessage() == null) { logger.error("Entry '{}' does not exist in cache", hkey); } else { logger.error("Unable to find entry '{}' in cache collection '{}': {}", hkey, collection, e.getMessage()); } return null; } } @Override public Boolean isAvailable() { try { return template.getConnectionFactory().getConnection().ping() != null; } catch (Exception e) { logger.warn("Redis server is not available at the moment."); } return false; } @Override public Boolean lock(String key, String value, Long second) { Boolean absent = template.opsForValue().setIfAbsent(key, value, second, TimeUnit.SECONDS); return absent; } @Override public Object getValue(String key) { return template.opsForValue().get(key); } @Override public void unLock(String key) { template.delete(key); } @Override public void increment(String key) { RedisAtomicLong counter = new RedisAtomicLong(key, template.getConnectionFactory()); counter.incrementAndGet(); } @Override public void setIfAbsent(String key, long value, long ttl) { ValueOperations<String, Object> ops = template.opsForValue(); ops.setIfAbsent(key, value, Duration.ofSeconds(ttl)); } @Override public Long get(String key) { RedisAtomicLong counter = new RedisAtomicLong(key, template.getConnectionFactory()); return counter.get(); } @Override public void set(String key, long value, long ttl) { RedisAtomicLong counter = new RedisAtomicLong(key, template.getConnectionFactory()); counter.set(value); counter.expire(ttl, TimeUnit.SECONDS); } @Override public void set(Object key, Object value, long ttl) { template.opsForValue().set(key, value, ttl, TimeUnit.SECONDS); } @Override public Object getByKey(String key) { return template.opsForValue().get(key); } @Override public void getLock(String key, String clientID) throws Exception { Boolean lock = false; // 重试3次,每间隔1秒重试1次 for (int j = 0; j <= 3; j++) { lock = lock(key, clientID, 10L); if (lock) { log.info("获得锁》》》" + key); break; } try { Thread.sleep(5000); } catch (InterruptedException e) { log.error("线程休眠异常", e); break; } } // 重试3次依然没有获取到锁,那么返回服务器繁忙,请稍后重试 if (!lock) { throw new Exception("服务繁忙"); } } @Override public void releaseLock(String key, String clientID) { if (clientID.equals(getByKey(key))) { unLock(key); } } @Override public boolean hasKey(String key) { return template.hasKey(key); } }
三、使用方法
import com.example.redis_demo_limit.annotation.LimitedAccess; import com.example.redis_demo_limit.redis.DataCacheRepository; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.UUID; @Slf4j @RestController @RequestMapping("/redis") public class RedisController { private static final String KEY = "key"; @Resource private DataCacheRepository dataCacheRepository; @LimitedAccess(frequency = 1,second = 1) @PostMapping("/add") public String add(String str){ dataCacheRepository.set("str","add success",200L); return "success"; } //分布式锁使用示例 @PostMapping("/pay") public String pay(String userName,Integer account){ String clientID = UUID.randomUUID().toString(); //设置锁的过期时间,避免死锁 Boolean lock = dataCacheRepository.lock(userName, clientID, 6000L); if(!lock){ log.info("未获取到锁{}", userName); return "程序繁忙,请稍后再试!"; } try { //等待5s,方便测试 Thread.sleep(5000); if(dataCacheRepository.hasKey(KEY)){ Long aLong = dataCacheRepository.get(KEY); dataCacheRepository.set(KEY,aLong+account,-1); return account+aLong+""; }else { dataCacheRepository.set(KEY,account,-1); return account+""; } } catch (InterruptedException e) { log.error(e.getMessage(),e); return "程序运行异常,请联系管理员!"; } finally { if (clientID.equals(dataCacheRepository.getByKey(userName))) { log.info("finally删除锁{}", userName); dataCacheRepository.unLock(userName); } } } }
四、结果测试
- 先在8082点击pay
- 再在8081点击pay
- 从结果来看,分布式锁起到了作用。很简单吧,还不来试一下!