大家好,我是小悟。
一、分布式锁概述
1.1 什么是分布式锁
分布式锁是在分布式系统中协调多个节点对共享资源进行互斥访问的机制。当多个服务实例需要访问共享资源时,分布式锁能确保同一时刻只有一个实例可以执行关键代码段。
1.2 分布式锁的特性
- 互斥性:同一时刻只有一个客户端能持有锁
- 可重入性:同一个客户端可以多次获取同一把锁
- 锁超时:避免死锁,锁应有自动过期机制
- 高可用:锁服务需要高可用,避免单点故障
- 高性能:获取和释放锁的操作应该高效
1.3 常见实现方式
- 基于数据库(如MySQL唯一索引)
- 基于Redis(推荐,性能好)
- 基于ZooKeeper(可靠性高)
- 基于ETCD
二、基于Redis的分布式锁实现(推荐)
2.1 项目结构
src/main/java/com/example/distributedlock/ ├── DistributedLockApplication.java ├── config/ │ ├── RedisConfig.java │ └── RedissonConfig.java ├── service/ │ ├── DistributedLockService.java │ └── impl/ │ ├── RedisDistributedLock.java │ └── RedissonDistributedLock.java ├── annotation/ │ └── DistributedLock.java ├── aspect/ │ └── DistributedLockAspect.java └── controller/ └── TestController.java
2.2 添加依赖
<!-- pom.xml --> <dependencies> <!-- Spring Boot Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- Redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.23.2</version> </dependency> <!-- AOP --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!-- Lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies>
2.3 配置文件
# application.yml spring: redis: host: localhost port: 6379 password: database: 0 timeout: 3000ms lettuce: pool: max-active: 8 max-idle: 8 min-idle: 0 max-wait: -1ms # Redisson配置(可选) redisson: config: | singleServerConfig: address: "redis://localhost:6379" database: 1
2.4 基于RedisTemplate的实现
2.4.1 Redis配置
@Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); // 设置序列化 Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL); jacksonSerializer.setObjectMapper(om); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(jacksonSerializer); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(jacksonSerializer); template.afterPropertiesSet(); return template; } }
2.4.2 分布式锁接口
public interface DistributedLockService { /** * 获取锁 * @param lockKey 锁key * @param requestId 请求标识(用于释放锁时验证) * @param expireTime 过期时间(毫秒) * @param waitTime 等待时间(毫秒) * @return 是否获取成功 */ boolean tryLock(String lockKey, String requestId, long expireTime, long waitTime); /** * 释放锁 * @param lockKey 锁key * @param requestId 请求标识 * @return 是否释放成功 */ boolean releaseLock(String lockKey, String requestId); /** * 自动续期的锁 * @param lockKey 锁key * @param requestId 请求标识 * @param expireTime 过期时间 * @param waitTime 等待时间 * @param action 要执行的业务逻辑 * @return 执行结果 */ <T> T tryLockWithAutoRenew(String lockKey, String requestId, long expireTime, long waitTime, Supplier<T> action); }
2.4.3 Redis分布式锁实现
@Service @Slf4j public class RedisDistributedLock implements DistributedLockService { @Autowired private RedisTemplate<String, Object> redisTemplate; private static final String LOCK_PREFIX = "LOCK:"; private static final String LUA_SCRIPT_RELEASE = "if redis.call('get', KEYS[1]) == ARGV[1] then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; @Override public boolean tryLock(String lockKey, String requestId, long expireTime, long waitTime) { String key = LOCK_PREFIX + lockKey; long start = System.currentTimeMillis(); try { while (System.currentTimeMillis() - start < waitTime) { // 使用SET命令实现原子性操作:NX表示不存在时才设置,PX设置过期时间 Boolean success = redisTemplate.opsForValue() .setIfAbsent(key, requestId, expireTime, TimeUnit.MILLISECONDS); if (Boolean.TRUE.equals(success)) { log.info("获取锁成功, key: {}, requestId: {}", key, requestId); return true; } // 等待随机时间后重试,避免惊群效应 Thread.sleep(getRandomWaitTime()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("获取锁被中断", e); } return false; } @Override public boolean releaseLock(String lockKey, String requestId) { String key = LOCK_PREFIX + lockKey; // 使用Lua脚本保证原子性:只有requestId匹配时才删除 DefaultRedisScript<Long> script = new DefaultRedisScript<>(); script.setScriptText(LUA_SCRIPT_RELEASE); script.setResultType(Long.class); Long result = redisTemplate.execute(script, Collections.singletonList(key), requestId); boolean success = result != null && result == 1; if (success) { log.info("释放锁成功, key: {}, requestId: {}", key, requestId); } else { log.warn("释放锁失败, key: {}, requestId: {}", key, requestId); } return success; } @Override public <T> T tryLockWithAutoRenew(String lockKey, String requestId, long expireTime, long waitTime, Supplier<T> action) { if (!tryLock(lockKey, requestId, expireTime, waitTime)) { throw new RuntimeException("获取分布式锁失败"); } // 启动自动续期线程 AutoRenewTask renewTask = new AutoRenewTask(lockKey, requestId, expireTime); Thread renewThread = new Thread(renewTask); renewThread.setDaemon(true); renewThread.start(); try { // 执行业务逻辑 return action.get(); } finally { // 停止续期并释放锁 renewTask.stop(); releaseLock(lockKey, requestId); } } /** * 自动续期任务 */ private class AutoRenewTask implements Runnable { private final String lockKey; private final String requestId; private final long expireTime; private volatile boolean running = true; public AutoRenewTask(String lockKey, String requestId, long expireTime) { this.lockKey = lockKey; this.requestId = requestId; this.expireTime = expireTime; } public void stop() { this.running = false; } @Override public void run() { String key = LOCK_PREFIX + lockKey; // 续期间隔设置为过期时间的1/3 long renewInterval = expireTime / 3; while (running) { try { Thread.sleep(renewInterval); // 检查锁是否存在且requestId匹配,然后续期 Object currentRequestId = redisTemplate.opsForValue().get(key); if (requestId.equals(currentRequestId)) { redisTemplate.expire(key, expireTime, TimeUnit.MILLISECONDS); log.debug("锁续期成功, key: {}", key); } else { log.warn("锁已不属于当前请求, 停止续期"); break; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } } private long getRandomWaitTime() { return (long) (Math.random() * 100 + 50); // 50-150ms随机等待 } }
2.5 基于Redisson的实现(更推荐)
2.5.1 Redisson配置
@Configuration public class RedissonConfig { @Value("${spring.redis.host:localhost}") private String redisHost; @Value("${spring.redis.port:6379}") private int redisPort; @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer() .setAddress(String.format("redis://%s:%d", redisHost, redisPort)) .setDatabase(0) .setConnectionPoolSize(10) .setConnectionMinimumIdleSize(5) .setTimeout(3000); return Redisson.create(config); } }
2.5.2 Redisson分布式锁实现
@Service @Primary @Slf4j public class RedissonDistributedLock implements DistributedLockService { @Autowired private RedissonClient redissonClient; @Override public boolean tryLock(String lockKey, String requestId, long expireTime, long waitTime) { RLock lock = redissonClient.getLock("LOCK:" + lockKey); try { // Redisson自动处理锁续期和重入 return lock.tryLock(waitTime, expireTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("获取锁被中断", e); return false; } } @Override public boolean releaseLock(String lockKey, String requestId) { RLock lock = redissonClient.getLock("LOCK:" + lockKey); // 检查锁是否被当前线程持有 if (lock.isHeldByCurrentThread()) { lock.unlock(); log.info("Redisson锁释放成功, key: {}", lockKey); return true; } return false; } @Override public <T> T tryLockWithAutoRenew(String lockKey, String requestId, long expireTime, long waitTime, Supplier<T> action) { RLock lock = redissonClient.getLock("LOCK:" + lockKey); try { if (lock.tryLock(waitTime, expireTime, TimeUnit.MILLISECONDS)) { try { return action.get(); } finally { // Redisson会自动续期,这里只需要在业务完成后释放 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } else { throw new RuntimeException("获取分布式锁失败"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("获取锁被中断", e); } } }
2.6 注解方式使用分布式锁
2.6.1 定义注解
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DistributedLock { /** * 锁的key,支持SpEL表达式 */ String key() default ""; /** * 锁前缀 */ String prefix() default "LOCK:"; /** * 过期时间(毫秒),默认30秒 */ long expire() default 30000; /** * 等待时间(毫秒),默认3秒 */ long waitTime() default 3000; /** * 是否使用公平锁 */ boolean fair() default false; /** * 获取锁失败时的错误信息 */ String errorMsg() default "系统繁忙,请稍后重试"; }
2.6.2 实现切面
@Aspect @Component @Slf4j public class DistributedLockAspect { @Autowired private DistributedLockService distributedLockService; @Autowired private RedissonClient redissonClient; private static final String REQUEST_ID_PREFIX = "LOCK_"; private static final AtomicLong REQUEST_COUNTER = new AtomicLong(0); @Around("@annotation(distributedLock)") public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable { // 生成请求ID String requestId = generateRequestId(); // 解析锁的key(支持SpEL) String lockKey = resolveLockKey(joinPoint, distributedLock); String fullLockKey = distributedLock.prefix() + lockKey; // 获取锁 RLock lock = null; if (distributedLock.fair()) { lock = redissonClient.getFairLock(fullLockKey); } else { lock = redissonClient.getLock(fullLockKey); } boolean locked = false; try { // 尝试获取锁 locked = lock.tryLock(distributedLock.waitTime(), distributedLock.expire(), TimeUnit.MILLISECONDS); if (!locked) { log.warn("获取分布式锁失败, key: {}", fullLockKey); throw new RuntimeException(distributedLock.errorMsg()); } log.info("获取分布式锁成功, key: {}, requestId: {}", fullLockKey, requestId); // 执行目标方法 return joinPoint.proceed(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("获取锁被中断", e); } finally { if (locked && lock.isHeldByCurrentThread()) { lock.unlock(); log.info("释放分布式锁, key: {}", fullLockKey); } } } private String generateRequestId() { String localIp = getLocalIp(); long timestamp = System.currentTimeMillis(); long sequence = REQUEST_COUNTER.incrementAndGet(); return String.format("%s_%s_%d_%d", REQUEST_ID_PREFIX, localIp, timestamp, sequence); } private String getLocalIp() { try { return InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { return "127.0.0.1"; } } private String resolveLockKey(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); Object[] args = joinPoint.getArgs(); Object target = joinPoint.getTarget(); // 如果key为空,使用类名+方法名 String key = distributedLock.key(); if (StringUtils.isEmpty(key)) { return target.getClass().getName() + ":" + method.getName(); } // 解析SpEL表达式 if (key.contains("#")) { ExpressionParser parser = new SpelExpressionParser(); Expression expression = parser.parseExpression(key); EvaluationContext context = new StandardEvaluationContext(); context.setVariable("args", args); // 设置参数名 String[] paramNames = signature.getParameterNames(); for (int i = 0; i < paramNames.length; i++) { context.setVariable(paramNames[i], args[i]); } return expression.getValue(context, String.class); } return key; } }
2.7 使用示例
@RestController @RequestMapping("/api/test") @Slf4j public class TestController { @Autowired private DistributedLockService distributedLockService; /** * 方式1:手动使用锁 */ @PostMapping("/order/{orderId}") public ResponseEntity<?> createOrder(@PathVariable String orderId) { String requestId = UUID.randomUUID().toString(); String lockKey = "order_create:" + orderId; // 使用自动续期的锁 String result = distributedLockService.tryLockWithAutoRenew( lockKey, requestId, 30000, // 30秒过期 3000, // 等待3秒 () -> { // 业务逻辑 log.info("处理订单创建, orderId: {}", orderId); try { Thread.sleep(1000); // 模拟业务处理 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "订单创建成功"; } ); return ResponseEntity.ok(result); } /** * 方式2:使用注解 */ @DistributedLock( key = "'inventory:' + #productId", // 使用SpEL expire = 30000, waitTime = 5000, errorMsg = "库存操作过于频繁,请稍后重试" ) @PostMapping("/inventory/{productId}") public ResponseEntity<?> updateInventory( @PathVariable String productId, @RequestParam int quantity) { log.info("更新库存, productId: {}, quantity: {}", productId, quantity); // 业务逻辑 try { Thread.sleep(2000); // 模拟业务处理 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return ResponseEntity.ok("库存更新成功"); } /** * 方式3:扣减库存示例(防超卖) */ @DistributedLock( key = "'product_stock:' + #productId", expire = 10000, waitTime = 3000 ) @PostMapping("/deduct/{productId}") public ResponseEntity<?> deductStock( @PathVariable String productId, @RequestParam int deductQuantity) { // 这里应该是数据库操作,为了示例使用伪代码 log.info("扣减库存, productId: {}, quantity: {}", productId, deductQuantity); // 1. 查询当前库存 // 2. 检查库存是否充足 // 3. 更新库存 // 4. 记录流水 return ResponseEntity.ok(Map.of( "success", true, "productId", productId, "deducted", deductQuantity )); } }
2.8 主启动类
@SpringBootApplication @EnableAspectJAutoProxy public class DistributedLockApplication { public static void main(String[] args) { SpringApplication.run(DistributedLockApplication.class, args); } }
三、其他实现方式
3.1 基于数据库的分布式锁
@Service @Slf4j public class DatabaseDistributedLock implements DistributedLockService { @Autowired private JdbcTemplate jdbcTemplate; @Override public boolean tryLock(String lockKey, String requestId, long expireTime, long waitTime) { long start = System.currentTimeMillis(); String sql = "INSERT INTO distributed_lock(lock_key, request_id, " + "expire_time, create_time) VALUES (?, ?, ?, ?)"; while (System.currentTimeMillis() - start < waitTime) { try { // 清理过期锁 cleanExpiredLocks(); // 尝试插入记录(利用唯一索引实现互斥) jdbcTemplate.update(sql, lockKey, requestId, System.currentTimeMillis() + expireTime, System.currentTimeMillis()); return true; } catch (DuplicateKeyException e) { // 锁已存在,等待后重试 try { Thread.sleep(100); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); return false; } } } return false; } @Override public boolean releaseLock(String lockKey, String requestId) { String sql = "DELETE FROM distributed_lock WHERE lock_key = ? " + "AND request_id = ?"; int rows = jdbcTemplate.update(sql, lockKey, requestId); return rows > 0; } private void cleanExpiredLocks() { String sql = "DELETE FROM distributed_lock WHERE expire_time < ?"; jdbcTemplate.update(sql, System.currentTimeMillis()); } }
四、总结
4.1 实现要点总结
- 原子性保证
- Redis使用SET NX PX命令或Lua脚本
- Redisson内部已实现原子操作
- 数据库使用唯一索引
- 避免死锁
- 必须设置锁超时时间
- 实现自动续期机制
- finally块中确保锁释放
- 锁续期机制
- 对于长任务需要自动续期
- 续期间隔应小于锁过期时间
- 续期前验证锁所有权
- 可重入性
- Redisson天然支持可重入锁
- 手动实现需要记录重入次数
4.2 方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
| Redis | 性能高,实现简单 | 需要保证Redis高可用 | 高并发场景 |
| Redisson | 功能完善,自动续期 | 依赖Redisson客户端 | 生产环境推荐 |
| 数据库 | 无需额外组件 | 性能差,有死锁风险 | 低频操作 |
| ZooKeeper | 可靠性高 | 性能一般,部署复杂 | 强一致性场景 |
4.3 最佳实践建议
- 锁粒度要小:尽量缩小锁的范围,减少锁持有时间
- 合理设置超时:根据业务执行时间合理设置锁超时时间
- 监控告警:监控锁的获取失败率和等待时间
- 降级方案:锁服务不可用时要有降级策略
- 键名设计:使用有意义的锁键名,便于排查问题
- 避免锁嵌套:尽量避免分布式锁嵌套使用
4.4 生产环境注意事项
- Redis集群模式:使用Redlock算法或Redisson的MultiLock
- 网络分区:考虑脑裂问题,设置合理的超时时间
- 性能监控:监控Redis内存、CPU和网络延迟
- 容灾方案:主从切换时可能导致锁失效,需要评估风险
4.5 推荐方案
对于大多数SpringBoot项目,推荐使用Redisson实现分布式锁,因为:
- 功能完善,支持多种锁类型(可重入、公平、读写锁等)
- 自动续期机制,避免业务未完成锁已过期
- 与SpringBoot良好集成
- 社区活跃,文档完善
通过以上实现,可以在SpringBoot应用中可靠地实现分布式锁,解决分布式环境下的资源竞争问题。
谢谢你看我的文章,既然看到这里了,如果觉得不错,随手点个赞、转发、在看三连吧,感谢感谢。那我们,下次再见。
您的一键三连,是我更新的最大动力,谢谢
山水有相逢,来日皆可期,谢谢阅读,我们再会
我手中的金箍棒,上能通天,下能探海