程序员必备的十大技能(进阶版)之分布式核心技术(五)

简介: 教程来源 http://yvyus.cn/ 本节系统讲解分布式核心能力:涵盖加权随机、最少活跃连接等负载均衡算法;Failover、Forking等容错策略;多级缓存与一致性保障;虚拟桶分片与平滑迁移;OpenTelemetry链路追踪及全链路日志透传,助力构建高可用、可观测的分布式系统。

七、负载均衡与容错

7.1 常见负载均衡算法

public interface LoadBalancer {
    ServiceInstance choose(List<ServiceInstance> instances);
}

// 随机(带权重)
public class WeightedRandomLoadBalancer implements LoadBalancer {
    @Override
    public ServiceInstance choose(List<ServiceInstance> instances) {
        int totalWeight = instances.stream().mapToInt(ServiceInstance::getWeight).sum();
        int random = ThreadLocalRandom.current().nextInt(totalWeight);

        int current = 0;
        for (ServiceInstance instance : instances) {
            current += instance.getWeight();
            if (random < current) {
                return instance;
            }
        }
        return instances.get(0);
    }
}

// 最少活跃连接(Dubbo)
public class LeastActiveLoadBalancer implements LoadBalancer {

    private final Map<ServiceInstance, AtomicInteger> activeCounts = new ConcurrentHashMap<>();

    @Override
    public ServiceInstance choose(List<ServiceInstance> instances) {
        ServiceInstance best = null;
        int leastActive = Integer.MAX_VALUE;

        for (ServiceInstance instance : instances) {
            int active = activeCounts.computeIfAbsent(instance, k -> new AtomicInteger(0)).get();
            if (active < leastActive) {
                leastActive = active;
                best = instance;
            }
        }
        return best;
    }

    // 调用前增加计数
    public void beforeInvoke(ServiceInstance instance) {
        activeCounts.get(instance).incrementAndGet();
    }

    // 调用后减少计数
    public void afterInvoke(ServiceInstance instance) {
        activeCounts.get(instance).decrementAndGet();
    }
}

7.2 客户端容错策略

@Component
public class FaultTolerantInvoker {

    // Failover:失败自动切换(默认策略)
    @Retryable(value = {RemoteException.class}, maxAttempts = 3)
    public Object failoverInvoke(List<ServiceInstance> instances) {
        for (ServiceInstance instance : instances) {
            try {
                return invoke(instance);
            } catch (RemoteException e) {
                log.warn("调用失败,切换到下一个节点: {}", instance);
                // 记录失败节点,标记为不可用
                markAsFailed(instance);
            }
        }
        throw new RuntimeException("所有节点均调用失败");
    }

    // Failfast:快速失败(非幂等操作)
    public Object failfastInvoke(ServiceInstance instance) {
        try {
            return invoke(instance);
        } catch (RemoteException e) {
            throw new BusinessException("调用失败,请稍后重试", e);
        }
    }

    // Failsafe:失败安全(记录日志,返回空结果)
    public Object failsafeInvoke(ServiceInstance instance) {
        try {
            return invoke(instance);
        } catch (Exception e) {
            log.error("调用失败,忽略错误", e);
            return null;
        }
    }

    // Forking:并行调用多个节点,取最快返回的结果
    public Object forkingInvoke(List<ServiceInstance> instances, long timeoutMs) {
        ExecutorService executor = Executors.newFixedThreadPool(instances.size());
        List<CompletableFuture<Object>> futures = new ArrayList<>();

        for (ServiceInstance instance : instances) {
            CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> 
                invoke(instance), executor);
            futures.add(future);
        }

        try {
            // 取第一个完成的结果
            return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]))
                .get(timeoutMs, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new RuntimeException("并行调用全部失败", e);
        } finally {
            executor.shutdownNow();
        }
    }

    // Broadcast:广播调用(所有节点),收集所有结果
    public List<Object> broadcastInvoke(List<ServiceInstance> instances) {
        return instances.parallelStream()
            .map(this::invoke)
            .collect(Collectors.toList());
    }
}

八、分布式存储

8.1 分布式缓存

// 多级缓存(本地缓存 + Redis)
@Component
public class MultiLevelCache {

    // Caffeine本地缓存
    private final Cache<String, Object> localCache = Caffeine.newBuilder()
        .maximumSize(10000)
        .expireAfterWrite(60, TimeUnit.SECONDS)
        .recordStats()
        .build();

    @Autowired
    private StringRedisTemplate redisTemplate;

    public Object get(String key) {
        // L1: 本地缓存
        Object value = localCache.getIfPresent(key);
        if (value != null) {
            return value;
        }

        // L2: Redis
        String json = redisTemplate.opsForValue().get(key);
        if (json != null) {
            value = JSON.parseObject(json, Object.class);
            localCache.put(key, value);
            return value;
        }

        return null;
    }

    // 缓存一致性问题:使用Redis Pub/Sub通知其他节点失效本地缓存
    public void put(String key, Object value) {
        // 写入Redis
        redisTemplate.opsForValue().set(key, JSON.toJSONString(value), 3600, TimeUnit.SECONDS);

        // 发送缓存失效消息(通知其他节点清除本地缓存)
        redisTemplate.convertAndSend("cache:invalidate", key);

        // 清除当前节点的本地缓存
        localCache.invalidate(key);
    }

    // 监听缓存失效消息
    @Component
    public static class CacheInvalidateListener implements MessageListener {

        @Autowired
        private Cache<String, Object> localCache;

        @Override
        public void onMessage(Message message, byte[] pattern) {
            String key = new String(message.getBody());
            localCache.invalidate(key);
            log.debug("收到缓存失效通知,清除本地缓存: {}", key);
        }
    }
}

8.2 数据分片与迁移

// 虚拟桶分片(一致性Hash的变种)
public class VirtualBucketSharding {

    private final int bucketCount = 1024;  // 虚拟桶数量
    private final int replicaCount = 3;    // 每个数据副本数

    private final Map<Integer, List<String>> bucketLocation = new ConcurrentHashMap<>();
    private final ConsistentHashRouter router;

    public VirtualBucketSharding(List<String> storageNodes) {
        // 将虚拟桶分配到物理节点
        for (int i = 0; i < bucketCount; i++) {
            String node = storageNodes.get(i % storageNodes.size());
            bucketLocation.computeIfAbsent(i, k -> new ArrayList<>()).add(node);
        }

        // 一致性哈希用于负载均衡
        this.router = new ConsistentHashRouter(storageNodes, 150);
    }

    // 计算key所属的虚拟桶
    public int getBucketId(String key) {
        int hash = Hashing.murmur3_32().hashBytes(key.getBytes()).asInt();
        return Math.abs(hash) % bucketCount;
    }

    // 获取key所在的节点(主节点)
    public String getPrimaryNode(String key) {
        int bucketId = getBucketId(key);
        return bucketLocation.get(bucketId).get(0);
    }

    // 数据迁移(扩容时)
    public void rebalance(List<String> newNodes) {
        // 1. 标记新加入的节点
        Set<String> newNodeSet = new HashSet<>(newNodes);

        // 2. 重新计算桶分配
        Map<Integer, List<String>> newBucketLocation = new HashMap<>();
        for (int i = 0; i < bucketCount; i++) {
            String newNode = newNodes.get(i % newNodes.size());
            newBucketLocation.computeIfAbsent(i, k -> new ArrayList<>()).add(newNode);
        }

        // 3. 找出需要迁移的桶
        Map<Integer, String> migrationPlan = new HashMap<>();
        for (int i = 0; i < bucketCount; i++) {
            String oldNode = bucketLocation.get(i).get(0);
            String newNode = newBucketLocation.get(i).get(0);
            if (!oldNode.equals(newNode)) {
                migrationPlan.put(i, newNode);
            }
        }

        // 4. 执行数据迁移(双写 + 异步迁移)
        for (Map.Entry<Integer, String> entry : migrationPlan.entrySet()) {
            int bucketId = entry.getKey();
            String targetNode = entry.getValue();
            asyncMigrateBucket(bucketId, targetNode);
        }

        // 5. 切换路由表
        this.bucketLocation.clear();
        this.bucketLocation.putAll(newBucketLocation);
    }

    private void asyncMigrateBucket(int bucketId, String targetNode) {
        // 1. 在目标节点创建临时分区
        // 2. 从源节点读取数据
        // 3. 写入目标节点
        // 4. 验证数据完整性
        // 5. 删除源节点数据
        // 6. 更新路由表
    }
}

九、链路追踪与可观测性

9.1 OpenTelemetry集成

@Configuration
public class TracingConfig {

    @Bean
    public OpenTelemetry openTelemetry() {
        // 导出到Jaeger
        JaegerGrpcSpanExporter jaegerExporter = JaegerGrpcSpanExporter.builder()
            .setEndpoint("http://localhost:14250")
            .build();

        // 配置采样率
        Sampler sampler = Sampler.traceIdRatioBased(0.1);  // 10%采样

        SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
            .addSpanProcessor(SimpleSpanProcessor.create(jaegerExporter))
            .setSampler(sampler)
            .build();

        return OpenTelemetrySdk.builder()
            .setTracerProvider(tracerProvider)
            .buildAndRegisterGlobal();
    }

    @Bean
    public Tracer tracer(OpenTelemetry openTelemetry) {
        return openTelemetry.getTracer("com.example.service");
    }
}

// 手动埋点
@Service
public class OrderService {

    @Autowired
    private Tracer tracer;

    public Order createOrder(OrderDTO dto) {
        // 创建Span
        Span span = tracer.spanBuilder("createOrder")
            .setSpanKind(SpanKind.SERVER)
            .setAttribute("order.userId", dto.getUserId())
            .setAttribute("order.amount", dto.getAmount())
            .startSpan();

        try (Scope scope = span.makeCurrent()) {
            // 业务逻辑
            Order order = doCreateOrder(dto);

            span.setStatus(StatusCode.OK);
            return order;

        } catch (Exception e) {
            span.setStatus(StatusCode.ERROR, e.getMessage());
            span.recordException(e);
            throw e;
        } finally {
            span.end();
        }
    }
}

// 自动拦截(@WithSpan)
@Component
public class InventoryClient {

    @WithSpan(kind = SpanKind.CLIENT)
    public boolean decreaseStock(Long productId, Integer quantity) {
        // 调用库存服务,Span会自动传播
        return restTemplate.postForObject(
            "http://inventory-service/decrease",
            new DecreaseStockRequest(productId, quantity),
            Boolean.class
        );
    }
}

9.2 全链路日志追踪

// MDC + TraceId传播
@Configuration
public class TraceIdInterceptor implements WebMvcConfigurer {

    // 生成TraceId
    public static final String TRACE_ID_KEY = "traceId";

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new HandlerInterceptor() {
            @Override
            public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
                String traceId = request.getHeader("X-Trace-Id");
                if (traceId == null || traceId.isEmpty()) {
                    traceId = generateTraceId();
                }
                MDC.put(TRACE_ID_KEY, traceId);
                response.setHeader("X-Trace-Id", traceId);
                return true;
            }

            @Override
            public void afterCompletion(HttpServletRequest request, HttpServletResponse response, 
                                       Object handler, Exception ex) {
                MDC.clear();
            }
        });
    }

    private String generateTraceId() {
        return UUID.randomUUID().toString().replace("-", "");
    }
}

// RestTemplate自动传播TraceId
@Component
public class TraceIdRestTemplateInterceptor implements ClientHttpRequestInterceptor {

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, 
                                        ClientHttpRequestExecution execution) throws IOException {
        String traceId = MDC.get(TraceIdInterceptor.TRACE_ID_KEY);
        if (traceId != null) {
            request.getHeaders().add("X-Trace-Id", traceId);
        }
        return execution.execute(request, body);
    }
}

// 线程池传播
public class TraceableThreadPoolExecutor extends ThreadPoolExecutor {

    @Override
    public void execute(Runnable command) {
        Map<String, String> context = MDC.getCopyOfContextMap();
        super.execute(() -> {
            try {
                if (context != null) {
                    MDC.setContextMap(context);
                }
                command.run();
            } finally {
                MDC.clear();
            }
        });
    }
}

// 日志配置(logback-spring.xml)
// <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %X{traceId} %-5level %logger{36} - %msg%n</pattern>

常见问题排查清单

问题分类:
  服务发现:
    - 检查注册中心连接状态
    - 查看服务心跳续约日志
    - 确认网络策略(安全组、防火墙)

  分布式事务:
    - 查看TCC/事务消息日志状态
    - 检查补偿任务执行情况
    - 确认幂等性控制是否生效

  分布式锁:
    - 检查锁超时时间是否合理
    - 查看锁持有者是否异常终止
    - 确认Redis/ZK集群状态

  消息队列:
    - 查看消费积压情况
    - 检查死信队列
    - 确认消费者线程池配置

  性能问题:
    - 分析调用链瓶颈
    - 检查线程池配置
    - 查看数据库连接池状态

来源:
http://zlpow.cn/

相关文章
|
8天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
2763 15
|
6天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
2302 4
|
21天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23554 13
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
8天前
|
人工智能 JSON BI
DeepSeek V4-Pro 接入 Claude Code 完全实战:体验、测试与关键避坑指南
Claude Code 作为当前主流的 AI 编程辅助工具,凭借强大的代码理解、工程执行与自动化能力深受开发者喜爱,但原生模型的使用成本相对较高。为了在保持能力的同时进一步降低开销,不少开发者开始寻找兼容度高、价格更友好的替代模型。DeepSeek V4 系列的发布带来了新的选择,该系列包含 V4-Pro 与 V4-Flash 两款模型,并提供了与 Anthropic 完全兼容的 API 接口,理论上只需简单修改配置,即可让 Claude Code 无缝切换为 DeepSeek 引擎。
2055 1
|
2天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
1306 1
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
|
14天前
|
人工智能 缓存 Shell
Claude Code 全攻略:命令大全 + 实战工作流(完整版)
Claude Code 是一款运行在终端环境下的 AI 编码助手,能够直接在项目目录中理解代码结构、编辑文件、执行命令、执行开发计划,并支持持久化记忆、上下文压缩、后台任务、多模型切换等专业能力。对于日常开发、项目维护、快速重构、代码审查等场景,它可以大幅减少手动操作、提升编码效率。本文从常用命令、界面模式、核心指令、记忆机制、图片处理、进阶工作流等维度完整说明,帮助开发者快速上手并稳定使用。
3456 5
|
7天前
|
人工智能 安全 开发工具
Claude Code 官方工作原理与使用指南
Claude Code 不是传统代码补全工具,而是 Anthropic 推出的终端 AI 代理,具备代理循环、双驱动架构(模型+工具)、全局项目感知、6 种权限模式等核心能力,本文基于官方文档系统解析其工作原理与高效使用技巧。
1095 0