五、分布式锁
5.1 Redis分布式锁(红锁 RedLock)
public class RedisDistributedLock {
private final StringRedisTemplate redisTemplate;
private final String lockKeyPrefix = "lock:";
private final long defaultExpireMs = 30000; // 默认30秒过期
// 获取锁(带重试)
public boolean tryLock(String key, String value, long expireMs, int retryTimes, long retryIntervalMs) {
for (int i = 0; i < retryTimes; i++) {
if (tryLock(key, value, expireMs)) {
return true;
}
try {
Thread.sleep(retryIntervalMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
// 获取锁(SET NX EX)
public boolean tryLock(String key, String value, long expireMs) {
String fullKey = lockKeyPrefix + key;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(fullKey, value, Duration.ofMillis(expireMs));
return Boolean.TRUE.equals(success);
}
// 释放锁(Lua脚本保证原子性,只释放自己持有的锁)
public boolean unlock(String key, String value) {
String fullKey = lockKeyPrefix + key;
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(fullKey),
value
);
return result != null && result == 1L;
}
// 锁续期(WatchDog机制)
@Component
public static class LockRenewalWatchDog {
private final Map<String, ScheduledFuture<?>> renewalTasks = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
public void startRenewal(String key, String value, long expireMs) {
ScheduledFuture<?> task = scheduler.scheduleAtFixedRate(() -> {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKeyPrefix + key),
value,
String.valueOf(expireMs / 1000)
);
if (result == 0) {
// 锁已被释放,取消续期
cancelRenewal(key);
}
}, expireMs / 3, expireMs / 3, TimeUnit.MILLISECONDS); // 每 expireMs/3 续期一次
renewalTasks.put(key, task);
}
public void cancelRenewal(String key) {
ScheduledFuture<?> task = renewalTasks.remove(key);
if (task != null) {
task.cancel(false);
}
}
}
}
// RedLock(多Redis节点)
public class RedLock {
private final List<RedisDistributedLock> redisLocks;
private final int quorum; // 多数派节点数
public RedLock(List<RedisTemplate> redisTemplates) {
this.redisLocks = redisTemplates.stream()
.map(template -> new RedisDistributedLock(template))
.collect(Collectors.toList());
this.quorum = redisLocks.size() / 2 + 1;
}
public boolean tryLock(String key, String value, long expireMs) {
int successCount = 0;
long startTime = System.currentTimeMillis();
// 向所有Redis节点请求锁
for (RedisDistributedLock lock : redisLocks) {
if (lock.tryLock(key, value, expireMs)) {
successCount++;
}
}
long elapsed = System.currentTimeMillis() - startTime;
// 成功获取多数派锁,且耗时小于过期时间
if (successCount >= quorum && elapsed < expireMs) {
return true;
}
// 获取失败,释放已获取的锁
for (RedisDistributedLock lock : redisLocks) {
lock.unlock(key, value);
}
return false;
}
}
5.2 ZooKeeper分布式锁
public class ZooKeeperDistributedLock implements AutoCloseable {
private final CuratorFramework client;
private final String lockPath;
private final String lockPrefix = "/locks/";
private String currentLockNode;
public ZooKeeperDistributedLock(String connectString, String lockName) {
this.client = CuratorFrameworkFactory.newClient(connectString, new ExponentialBackoffRetry(1000, 3));
this.client.start();
this.lockPath = lockPrefix + lockName;
try {
// 创建持久节点
Stat stat = client.checkExists().forPath(lockPath);
if (stat == null) {
client.create().creatingParentsIfNeeded().forPath(lockPath);
}
} catch (Exception e) {
throw new RuntimeException("初始化锁失败", e);
}
}
// 获取锁(顺序临时节点 + 监听前一个节点)
public void lock() {
try {
// 创建顺序临时节点
currentLockNode = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(lockPath + "/lock_");
// 获取所有子节点
List<String> children = client.getChildren().forPath(lockPath);
Collections.sort(children);
String currentNodeName = currentLockNode.substring(currentLockNode.lastIndexOf('/') + 1);
int currentNodeIndex = children.indexOf(currentNodeName);
if (currentNodeIndex == 0) {
// 第一个节点,获得锁
return;
}
// 监听前一个节点
String previousNode = lockPath + "/" + children.get(currentNodeIndex - 1);
CountDownLatch latch = new CountDownLatch(1);
NodeCache nodeCache = new NodeCache(client, previousNode);
nodeCache.start();
nodeCache.getListenable().addListener(() -> {
latch.countDown();
});
// 等待前一个节点释放
latch.await();
nodeCache.close();
} catch (Exception e) {
throw new RuntimeException("获取锁失败", e);
}
}
// 尝试获取锁(非阻塞)
public boolean tryLock(long timeout, TimeUnit unit) {
try {
// 创建顺序临时节点
currentLockNode = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(lockPath + "/lock_");
List<String> children = client.getChildren().forPath(lockPath);
Collections.sort(children);
String currentNodeName = currentLockNode.substring(currentLockNode.lastIndexOf('/') + 1);
int currentNodeIndex = children.indexOf(currentNodeName);
if (currentNodeIndex == 0) {
return true;
}
// 不是第一个节点,等待一段时间
String previousNode = lockPath + "/" + children.get(currentNodeIndex - 1);
CountDownLatch latch = new CountDownLatch(1);
NodeCache nodeCache = new NodeCache(client, previousNode);
nodeCache.start();
nodeCache.getListenable().addListener(() -> latch.countDown());
boolean acquired = latch.await(timeout, unit);
nodeCache.close();
if (!acquired) {
// 超时未获得锁,删除自己创建的节点
client.delete().forPath(currentLockNode);
return false;
}
return true;
} catch (Exception e) {
return false;
}
}
// 释放锁
public void unlock() {
if (currentLockNode != null) {
try {
client.delete().forPath(currentLockNode);
currentLockNode = null;
} catch (Exception e) {
log.error("释放锁失败", e);
}
}
}
@Override
public void close() {
unlock();
client.close();
}
}
六、消息中间件
6.1 消息队列的核心问题
消息丢失与可靠性保证
// 生产端保证(同步发送 + 回调确认)
@Component
public class ReliableMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 同步发送(确保Broker收到)
public SendResult syncSend(String topic, Object message) {
SendResult result = rocketMQTemplate.syncSend(topic, message, 3000);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 记录失败消息,异步重试
saveToRetryQueue(topic, message);
throw new RuntimeException("消息发送失败: " + result.getSendStatus());
}
return result;
}
// 异步发送 + 回调
public void asyncSend(String topic, Object message, MessageCallback callback) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
callback.onSuccess(sendResult);
}
@Override
public void onException(Throwable e) {
log.error("消息发送失败", e);
// 记录失败消息,重试
retryMessage(topic, message);
callback.onFailure(e);
}
});
}
}
// Broker端保证(同步刷盘 + 主从同步)
// 配置:
// flushDiskType=SYNC_FLUSH(同步刷盘)
// brokerRole=SYNC_MASTER(同步双写)
// 消费端保证(手动确认)
@RocketMQMessageListener(topic = "order_topic", consumerGroup = "order_group",
messageModel = MessageModel.CLUSTERING,
consumeMode = ConsumeMode.CONCURRENTLY)
public class OrderConsumer implements RocketMQListener<OrderMessage> {
@Override
public void onMessage(OrderMessage message) {
try {
// 处理业务逻辑
processOrder(message);
// 处理成功,返回CONSUME_SUCCESS(自动确认)
} catch (Exception e) {
log.error("消费失败", e);
// 返回RECONSUME_LATER(稍后重试)
throw new RuntimeException("消费失败,稍后重试");
}
}
}
消息幂等性处理
@Component
public class IdempotentConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String PROCESSED_FLAG_PREFIX = "msg:processed:";
private static final long EXPIRE_SECONDS = 86400; // 24小时
// 基于Redis的幂等性判断
public boolean isProcessed(String messageId) {
String key = PROCESSED_FLAG_PREFIX + messageId;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofSeconds(EXPIRE_SECONDS));
return Boolean.FALSE.equals(success);
}
// 基于数据库唯一索引
@Transactional
public void consumeWithDbIdempotent(MessageRecord message) {
try {
// 插入消息消费记录(唯一索引:message_id)
messageRecordMapper.insert(message);
// 执行实际业务逻辑
processBusiness(message);
} catch (DuplicateKeyException e) {
// 重复消息,跳过
log.info("重复消息,messageId={}", message.getMessageId());
}
}
// 基于业务键的幂等性(如订单号、交易流水号)
public void consumeWithBusinessIdempotent(OrderMessage message) {
String businessKey = "order:" + message.getOrderId();
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(businessKey, "processing", Duration.ofSeconds(30));
if (Boolean.FALSE.equals(locked)) {
log.info("订单正在处理中或已处理完成: {}", message.getOrderId());
return;
}
try {
// 检查是否已处理
Order order = orderMapper.selectById(message.getOrderId());
if (order != null && "PROCESSED".equals(order.getStatus())) {
return;
}
// 处理业务
processOrder(message);
} finally {
redisTemplate.delete(businessKey);
}
}
}
6.2 消息顺序保证
// RocketMQ顺序消息(消息队列选择器)
public class OrderlyMessageProducer {
public void sendOrderlyMessage(String topic, OrderMessage message, String orderId) {
// 使用orderId作为选择Key,相同orderId的消息进入同一个队列
rocketMQTemplate.syncSendOrderly(topic, message, orderId);
}
}
// 消费端(顺序消费)
@RocketMQMessageListener(topic = "order_topic", consumerGroup = "order_group",
consumeMode = ConsumeMode.ORDERLY) // 顺序消费模式
public class OrderlyConsumer implements RocketMQListener<OrderMessage> {
@Override
public void onMessage(OrderMessage message) {
// 顺序消费,无需考虑并发问题
processOrder(message);
}
}