程序员必备的十大技能(进阶版)之分布式核心技术(四)

简介: 教程来源 http://rvtst.cn/ 本节详解分布式锁与消息中间件核心实践:Redis红锁(RedLock)通过多节点多数派机制保障高可用与互斥性;ZooKeeper锁基于临时顺序节点实现强一致性;消息队列则覆盖可靠性(同步发送+手动确认)、幂等性(Redis/DB去重)及顺序性(有序发送与消费)。

五、分布式锁

5.1 Redis分布式锁(红锁 RedLock)

public class RedisDistributedLock {

    private final StringRedisTemplate redisTemplate;
    private final String lockKeyPrefix = "lock:";
    private final long defaultExpireMs = 30000;  // 默认30秒过期

    // 获取锁(带重试)
    public boolean tryLock(String key, String value, long expireMs, int retryTimes, long retryIntervalMs) {
        for (int i = 0; i < retryTimes; i++) {
            if (tryLock(key, value, expireMs)) {
                return true;
            }
            try {
                Thread.sleep(retryIntervalMs);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return false;
    }

    // 获取锁(SET NX EX)
    public boolean tryLock(String key, String value, long expireMs) {
        String fullKey = lockKeyPrefix + key;
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(fullKey, value, Duration.ofMillis(expireMs));
        return Boolean.TRUE.equals(success);
    }

    // 释放锁(Lua脚本保证原子性,只释放自己持有的锁)
    public boolean unlock(String key, String value) {
        String fullKey = lockKeyPrefix + key;
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "   return redis.call('del', KEYS[1]) " +
            "else " +
            "   return 0 " +
            "end";

        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(fullKey),
            value
        );
        return result != null && result == 1L;
    }

    // 锁续期(WatchDog机制)
    @Component
    public static class LockRenewalWatchDog {
        private final Map<String, ScheduledFuture<?>> renewalTasks = new ConcurrentHashMap<>();
        private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);

        public void startRenewal(String key, String value, long expireMs) {
            ScheduledFuture<?> task = scheduler.scheduleAtFixedRate(() -> {
                String script = 
                    "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                    "   return redis.call('expire', KEYS[1], ARGV[2]) " +
                    "else " +
                    "   return 0 " +
                    "end";

                Long result = redisTemplate.execute(
                    new DefaultRedisScript<>(script, Long.class),
                    Collections.singletonList(lockKeyPrefix + key),
                    value,
                    String.valueOf(expireMs / 1000)
                );

                if (result == 0) {
                    // 锁已被释放,取消续期
                    cancelRenewal(key);
                }
            }, expireMs / 3, expireMs / 3, TimeUnit.MILLISECONDS);  // 每 expireMs/3 续期一次

            renewalTasks.put(key, task);
        }

        public void cancelRenewal(String key) {
            ScheduledFuture<?> task = renewalTasks.remove(key);
            if (task != null) {
                task.cancel(false);
            }
        }
    }
}

// RedLock(多Redis节点)
public class RedLock {

    private final List<RedisDistributedLock> redisLocks;
    private final int quorum;  // 多数派节点数

    public RedLock(List<RedisTemplate> redisTemplates) {
        this.redisLocks = redisTemplates.stream()
            .map(template -> new RedisDistributedLock(template))
            .collect(Collectors.toList());
        this.quorum = redisLocks.size() / 2 + 1;
    }

    public boolean tryLock(String key, String value, long expireMs) {
        int successCount = 0;
        long startTime = System.currentTimeMillis();

        // 向所有Redis节点请求锁
        for (RedisDistributedLock lock : redisLocks) {
            if (lock.tryLock(key, value, expireMs)) {
                successCount++;
            }
        }

        long elapsed = System.currentTimeMillis() - startTime;

        // 成功获取多数派锁,且耗时小于过期时间
        if (successCount >= quorum && elapsed < expireMs) {
            return true;
        }

        // 获取失败,释放已获取的锁
        for (RedisDistributedLock lock : redisLocks) {
            lock.unlock(key, value);
        }
        return false;
    }
}

5.2 ZooKeeper分布式锁

public class ZooKeeperDistributedLock implements AutoCloseable {

    private final CuratorFramework client;
    private final String lockPath;
    private final String lockPrefix = "/locks/";
    private String currentLockNode;

    public ZooKeeperDistributedLock(String connectString, String lockName) {
        this.client = CuratorFrameworkFactory.newClient(connectString, new ExponentialBackoffRetry(1000, 3));
        this.client.start();
        this.lockPath = lockPrefix + lockName;

        try {
            // 创建持久节点
            Stat stat = client.checkExists().forPath(lockPath);
            if (stat == null) {
                client.create().creatingParentsIfNeeded().forPath(lockPath);
            }
        } catch (Exception e) {
            throw new RuntimeException("初始化锁失败", e);
        }
    }

    // 获取锁(顺序临时节点 + 监听前一个节点)
    public void lock() {
        try {
            // 创建顺序临时节点
            currentLockNode = client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(lockPath + "/lock_");

            // 获取所有子节点
            List<String> children = client.getChildren().forPath(lockPath);
            Collections.sort(children);

            String currentNodeName = currentLockNode.substring(currentLockNode.lastIndexOf('/') + 1);
            int currentNodeIndex = children.indexOf(currentNodeName);

            if (currentNodeIndex == 0) {
                // 第一个节点,获得锁
                return;
            }

            // 监听前一个节点
            String previousNode = lockPath + "/" + children.get(currentNodeIndex - 1);
            CountDownLatch latch = new CountDownLatch(1);

            NodeCache nodeCache = new NodeCache(client, previousNode);
            nodeCache.start();
            nodeCache.getListenable().addListener(() -> {
                latch.countDown();
            });

            // 等待前一个节点释放
            latch.await();
            nodeCache.close();

        } catch (Exception e) {
            throw new RuntimeException("获取锁失败", e);
        }
    }

    // 尝试获取锁(非阻塞)
    public boolean tryLock(long timeout, TimeUnit unit) {
        try {
            // 创建顺序临时节点
            currentLockNode = client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(lockPath + "/lock_");

            List<String> children = client.getChildren().forPath(lockPath);
            Collections.sort(children);

            String currentNodeName = currentLockNode.substring(currentLockNode.lastIndexOf('/') + 1);
            int currentNodeIndex = children.indexOf(currentNodeName);

            if (currentNodeIndex == 0) {
                return true;
            }

            // 不是第一个节点,等待一段时间
            String previousNode = lockPath + "/" + children.get(currentNodeIndex - 1);
            CountDownLatch latch = new CountDownLatch(1);

            NodeCache nodeCache = new NodeCache(client, previousNode);
            nodeCache.start();
            nodeCache.getListenable().addListener(() -> latch.countDown());

            boolean acquired = latch.await(timeout, unit);
            nodeCache.close();

            if (!acquired) {
                // 超时未获得锁,删除自己创建的节点
                client.delete().forPath(currentLockNode);
                return false;
            }
            return true;

        } catch (Exception e) {
            return false;
        }
    }

    // 释放锁
    public void unlock() {
        if (currentLockNode != null) {
            try {
                client.delete().forPath(currentLockNode);
                currentLockNode = null;
            } catch (Exception e) {
                log.error("释放锁失败", e);
            }
        }
    }

    @Override
    public void close() {
        unlock();
        client.close();
    }
}

六、消息中间件

6.1 消息队列的核心问题
消息丢失与可靠性保证

// 生产端保证(同步发送 + 回调确认)
@Component
public class ReliableMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 同步发送(确保Broker收到)
    public SendResult syncSend(String topic, Object message) {
        SendResult result = rocketMQTemplate.syncSend(topic, message, 3000);
        if (result.getSendStatus() != SendStatus.SEND_OK) {
            // 记录失败消息,异步重试
            saveToRetryQueue(topic, message);
            throw new RuntimeException("消息发送失败: " + result.getSendStatus());
        }
        return result;
    }

    // 异步发送 + 回调
    public void asyncSend(String topic, Object message, MessageCallback callback) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                callback.onSuccess(sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.error("消息发送失败", e);
                // 记录失败消息,重试
                retryMessage(topic, message);
                callback.onFailure(e);
            }
        });
    }
}

// Broker端保证(同步刷盘 + 主从同步)
// 配置:
// flushDiskType=SYNC_FLUSH(同步刷盘)
// brokerRole=SYNC_MASTER(同步双写)

// 消费端保证(手动确认)
@RocketMQMessageListener(topic = "order_topic", consumerGroup = "order_group", 
                         messageModel = MessageModel.CLUSTERING,
                         consumeMode = ConsumeMode.CONCURRENTLY)
public class OrderConsumer implements RocketMQListener<OrderMessage> {

    @Override
    public void onMessage(OrderMessage message) {
        try {
            // 处理业务逻辑
            processOrder(message);

            // 处理成功,返回CONSUME_SUCCESS(自动确认)
        } catch (Exception e) {
            log.error("消费失败", e);
            // 返回RECONSUME_LATER(稍后重试)
            throw new RuntimeException("消费失败,稍后重试");
        }
    }
}

消息幂等性处理

@Component
public class IdempotentConsumer {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String PROCESSED_FLAG_PREFIX = "msg:processed:";
    private static final long EXPIRE_SECONDS = 86400;  // 24小时

    // 基于Redis的幂等性判断
    public boolean isProcessed(String messageId) {
        String key = PROCESSED_FLAG_PREFIX + messageId;
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", Duration.ofSeconds(EXPIRE_SECONDS));
        return Boolean.FALSE.equals(success);
    }

    // 基于数据库唯一索引
    @Transactional
    public void consumeWithDbIdempotent(MessageRecord message) {
        try {
            // 插入消息消费记录(唯一索引:message_id)
            messageRecordMapper.insert(message);

            // 执行实际业务逻辑
            processBusiness(message);

        } catch (DuplicateKeyException e) {
            // 重复消息,跳过
            log.info("重复消息,messageId={}", message.getMessageId());
        }
    }

    // 基于业务键的幂等性(如订单号、交易流水号)
    public void consumeWithBusinessIdempotent(OrderMessage message) {
        String businessKey = "order:" + message.getOrderId();
        Boolean locked = redisTemplate.opsForValue()
            .setIfAbsent(businessKey, "processing", Duration.ofSeconds(30));

        if (Boolean.FALSE.equals(locked)) {
            log.info("订单正在处理中或已处理完成: {}", message.getOrderId());
            return;
        }

        try {
            // 检查是否已处理
            Order order = orderMapper.selectById(message.getOrderId());
            if (order != null && "PROCESSED".equals(order.getStatus())) {
                return;
            }

            // 处理业务
            processOrder(message);

        } finally {
            redisTemplate.delete(businessKey);
        }
    }
}

6.2 消息顺序保证

// RocketMQ顺序消息(消息队列选择器)
public class OrderlyMessageProducer {

    public void sendOrderlyMessage(String topic, OrderMessage message, String orderId) {
        // 使用orderId作为选择Key,相同orderId的消息进入同一个队列
        rocketMQTemplate.syncSendOrderly(topic, message, orderId);
    }
}

// 消费端(顺序消费)
@RocketMQMessageListener(topic = "order_topic", consumerGroup = "order_group",
                         consumeMode = ConsumeMode.ORDERLY)  // 顺序消费模式
public class OrderlyConsumer implements RocketMQListener<OrderMessage> {

    @Override
    public void onMessage(OrderMessage message) {
        // 顺序消费,无需考虑并发问题
        processOrder(message);
    }
}

来源:
http://uklgy.cn/

相关文章
|
9天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
2805 16
|
6天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
2399 5
|
21天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23557 14
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
8天前
|
人工智能 JSON BI
DeepSeek V4-Pro 接入 Claude Code 完全实战:体验、测试与关键避坑指南
Claude Code 作为当前主流的 AI 编程辅助工具,凭借强大的代码理解、工程执行与自动化能力深受开发者喜爱,但原生模型的使用成本相对较高。为了在保持能力的同时进一步降低开销,不少开发者开始寻找兼容度高、价格更友好的替代模型。DeepSeek V4 系列的发布带来了新的选择,该系列包含 V4-Pro 与 V4-Flash 两款模型,并提供了与 Anthropic 完全兼容的 API 接口,理论上只需简单修改配置,即可让 Claude Code 无缝切换为 DeepSeek 引擎。
2100 2
|
2天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
1382 1
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
|
15天前
|
人工智能 缓存 Shell
Claude Code 全攻略:命令大全 + 实战工作流(完整版)
Claude Code 是一款运行在终端环境下的 AI 编码助手,能够直接在项目目录中理解代码结构、编辑文件、执行命令、执行开发计划,并支持持久化记忆、上下文压缩、后台任务、多模型切换等专业能力。对于日常开发、项目维护、快速重构、代码审查等场景,它可以大幅减少手动操作、提升编码效率。本文从常用命令、界面模式、核心指令、记忆机制、图片处理、进阶工作流等维度完整说明,帮助开发者快速上手并稳定使用。
3490 6
|
7天前
|
人工智能 安全 开发工具
Claude Code 官方工作原理与使用指南
Claude Code 不是传统代码补全工具,而是 Anthropic 推出的终端 AI 代理,具备代理循环、双驱动架构(模型+工具)、全局项目感知、6 种权限模式等核心能力,本文基于官方文档系统解析其工作原理与高效使用技巧。
1125 0

热门文章

最新文章