七、负载均衡与容错
7.1 常见负载均衡算法
public interface LoadBalancer {
ServiceInstance choose(List<ServiceInstance> instances);
}
// 随机(带权重)
public class WeightedRandomLoadBalancer implements LoadBalancer {
@Override
public ServiceInstance choose(List<ServiceInstance> instances) {
int totalWeight = instances.stream().mapToInt(ServiceInstance::getWeight).sum();
int random = ThreadLocalRandom.current().nextInt(totalWeight);
int current = 0;
for (ServiceInstance instance : instances) {
current += instance.getWeight();
if (random < current) {
return instance;
}
}
return instances.get(0);
}
}
// 最少活跃连接(Dubbo)
public class LeastActiveLoadBalancer implements LoadBalancer {
private final Map<ServiceInstance, AtomicInteger> activeCounts = new ConcurrentHashMap<>();
@Override
public ServiceInstance choose(List<ServiceInstance> instances) {
ServiceInstance best = null;
int leastActive = Integer.MAX_VALUE;
for (ServiceInstance instance : instances) {
int active = activeCounts.computeIfAbsent(instance, k -> new AtomicInteger(0)).get();
if (active < leastActive) {
leastActive = active;
best = instance;
}
}
return best;
}
// 调用前增加计数
public void beforeInvoke(ServiceInstance instance) {
activeCounts.get(instance).incrementAndGet();
}
// 调用后减少计数
public void afterInvoke(ServiceInstance instance) {
activeCounts.get(instance).decrementAndGet();
}
}
7.2 客户端容错策略
@Component
public class FaultTolerantInvoker {
// Failover:失败自动切换(默认策略)
@Retryable(value = {RemoteException.class}, maxAttempts = 3)
public Object failoverInvoke(List<ServiceInstance> instances) {
for (ServiceInstance instance : instances) {
try {
return invoke(instance);
} catch (RemoteException e) {
log.warn("调用失败,切换到下一个节点: {}", instance);
// 记录失败节点,标记为不可用
markAsFailed(instance);
}
}
throw new RuntimeException("所有节点均调用失败");
}
// Failfast:快速失败(非幂等操作)
public Object failfastInvoke(ServiceInstance instance) {
try {
return invoke(instance);
} catch (RemoteException e) {
throw new BusinessException("调用失败,请稍后重试", e);
}
}
// Failsafe:失败安全(记录日志,返回空结果)
public Object failsafeInvoke(ServiceInstance instance) {
try {
return invoke(instance);
} catch (Exception e) {
log.error("调用失败,忽略错误", e);
return null;
}
}
// Forking:并行调用多个节点,取最快返回的结果
public Object forkingInvoke(List<ServiceInstance> instances, long timeoutMs) {
ExecutorService executor = Executors.newFixedThreadPool(instances.size());
List<CompletableFuture<Object>> futures = new ArrayList<>();
for (ServiceInstance instance : instances) {
CompletableFuture<Object> future = CompletableFuture.supplyAsync(() ->
invoke(instance), executor);
futures.add(future);
}
try {
// 取第一个完成的结果
return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]))
.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException("并行调用全部失败", e);
} finally {
executor.shutdownNow();
}
}
// Broadcast:广播调用(所有节点),收集所有结果
public List<Object> broadcastInvoke(List<ServiceInstance> instances) {
return instances.parallelStream()
.map(this::invoke)
.collect(Collectors.toList());
}
}
八、分布式存储
8.1 分布式缓存
// 多级缓存(本地缓存 + Redis)
@Component
public class MultiLevelCache {
// Caffeine本地缓存
private final Cache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(60, TimeUnit.SECONDS)
.recordStats()
.build();
@Autowired
private StringRedisTemplate redisTemplate;
public Object get(String key) {
// L1: 本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
// L2: Redis
String json = redisTemplate.opsForValue().get(key);
if (json != null) {
value = JSON.parseObject(json, Object.class);
localCache.put(key, value);
return value;
}
return null;
}
// 缓存一致性问题:使用Redis Pub/Sub通知其他节点失效本地缓存
public void put(String key, Object value) {
// 写入Redis
redisTemplate.opsForValue().set(key, JSON.toJSONString(value), 3600, TimeUnit.SECONDS);
// 发送缓存失效消息(通知其他节点清除本地缓存)
redisTemplate.convertAndSend("cache:invalidate", key);
// 清除当前节点的本地缓存
localCache.invalidate(key);
}
// 监听缓存失效消息
@Component
public static class CacheInvalidateListener implements MessageListener {
@Autowired
private Cache<String, Object> localCache;
@Override
public void onMessage(Message message, byte[] pattern) {
String key = new String(message.getBody());
localCache.invalidate(key);
log.debug("收到缓存失效通知,清除本地缓存: {}", key);
}
}
}
8.2 数据分片与迁移
// 虚拟桶分片(一致性Hash的变种)
public class VirtualBucketSharding {
private final int bucketCount = 1024; // 虚拟桶数量
private final int replicaCount = 3; // 每个数据副本数
private final Map<Integer, List<String>> bucketLocation = new ConcurrentHashMap<>();
private final ConsistentHashRouter router;
public VirtualBucketSharding(List<String> storageNodes) {
// 将虚拟桶分配到物理节点
for (int i = 0; i < bucketCount; i++) {
String node = storageNodes.get(i % storageNodes.size());
bucketLocation.computeIfAbsent(i, k -> new ArrayList<>()).add(node);
}
// 一致性哈希用于负载均衡
this.router = new ConsistentHashRouter(storageNodes, 150);
}
// 计算key所属的虚拟桶
public int getBucketId(String key) {
int hash = Hashing.murmur3_32().hashBytes(key.getBytes()).asInt();
return Math.abs(hash) % bucketCount;
}
// 获取key所在的节点(主节点)
public String getPrimaryNode(String key) {
int bucketId = getBucketId(key);
return bucketLocation.get(bucketId).get(0);
}
// 数据迁移(扩容时)
public void rebalance(List<String> newNodes) {
// 1. 标记新加入的节点
Set<String> newNodeSet = new HashSet<>(newNodes);
// 2. 重新计算桶分配
Map<Integer, List<String>> newBucketLocation = new HashMap<>();
for (int i = 0; i < bucketCount; i++) {
String newNode = newNodes.get(i % newNodes.size());
newBucketLocation.computeIfAbsent(i, k -> new ArrayList<>()).add(newNode);
}
// 3. 找出需要迁移的桶
Map<Integer, String> migrationPlan = new HashMap<>();
for (int i = 0; i < bucketCount; i++) {
String oldNode = bucketLocation.get(i).get(0);
String newNode = newBucketLocation.get(i).get(0);
if (!oldNode.equals(newNode)) {
migrationPlan.put(i, newNode);
}
}
// 4. 执行数据迁移(双写 + 异步迁移)
for (Map.Entry<Integer, String> entry : migrationPlan.entrySet()) {
int bucketId = entry.getKey();
String targetNode = entry.getValue();
asyncMigrateBucket(bucketId, targetNode);
}
// 5. 切换路由表
this.bucketLocation.clear();
this.bucketLocation.putAll(newBucketLocation);
}
private void asyncMigrateBucket(int bucketId, String targetNode) {
// 1. 在目标节点创建临时分区
// 2. 从源节点读取数据
// 3. 写入目标节点
// 4. 验证数据完整性
// 5. 删除源节点数据
// 6. 更新路由表
}
}
九、链路追踪与可观测性
9.1 OpenTelemetry集成
@Configuration
public class TracingConfig {
@Bean
public OpenTelemetry openTelemetry() {
// 导出到Jaeger
JaegerGrpcSpanExporter jaegerExporter = JaegerGrpcSpanExporter.builder()
.setEndpoint("http://localhost:14250")
.build();
// 配置采样率
Sampler sampler = Sampler.traceIdRatioBased(0.1); // 10%采样
SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(jaegerExporter))
.setSampler(sampler)
.build();
return OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.buildAndRegisterGlobal();
}
@Bean
public Tracer tracer(OpenTelemetry openTelemetry) {
return openTelemetry.getTracer("com.example.service");
}
}
// 手动埋点
@Service
public class OrderService {
@Autowired
private Tracer tracer;
public Order createOrder(OrderDTO dto) {
// 创建Span
Span span = tracer.spanBuilder("createOrder")
.setSpanKind(SpanKind.SERVER)
.setAttribute("order.userId", dto.getUserId())
.setAttribute("order.amount", dto.getAmount())
.startSpan();
try (Scope scope = span.makeCurrent()) {
// 业务逻辑
Order order = doCreateOrder(dto);
span.setStatus(StatusCode.OK);
return order;
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
span.end();
}
}
}
// 自动拦截(@WithSpan)
@Component
public class InventoryClient {
@WithSpan(kind = SpanKind.CLIENT)
public boolean decreaseStock(Long productId, Integer quantity) {
// 调用库存服务,Span会自动传播
return restTemplate.postForObject(
"http://inventory-service/decrease",
new DecreaseStockRequest(productId, quantity),
Boolean.class
);
}
}
9.2 全链路日志追踪
// MDC + TraceId传播
@Configuration
public class TraceIdInterceptor implements WebMvcConfigurer {
// 生成TraceId
public static final String TRACE_ID_KEY = "traceId";
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new HandlerInterceptor() {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String traceId = request.getHeader("X-Trace-Id");
if (traceId == null || traceId.isEmpty()) {
traceId = generateTraceId();
}
MDC.put(TRACE_ID_KEY, traceId);
response.setHeader("X-Trace-Id", traceId);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception ex) {
MDC.clear();
}
});
}
private String generateTraceId() {
return UUID.randomUUID().toString().replace("-", "");
}
}
// RestTemplate自动传播TraceId
@Component
public class TraceIdRestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body,
ClientHttpRequestExecution execution) throws IOException {
String traceId = MDC.get(TraceIdInterceptor.TRACE_ID_KEY);
if (traceId != null) {
request.getHeaders().add("X-Trace-Id", traceId);
}
return execution.execute(request, body);
}
}
// 线程池传播
public class TraceableThreadPoolExecutor extends ThreadPoolExecutor {
@Override
public void execute(Runnable command) {
Map<String, String> context = MDC.getCopyOfContextMap();
super.execute(() -> {
try {
if (context != null) {
MDC.setContextMap(context);
}
command.run();
} finally {
MDC.clear();
}
});
}
}
// 日志配置(logback-spring.xml)
// <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %X{traceId} %-5level %logger{36} - %msg%n</pattern>
常见问题排查清单
问题分类:
服务发现:
- 检查注册中心连接状态
- 查看服务心跳续约日志
- 确认网络策略(安全组、防火墙)
分布式事务:
- 查看TCC/事务消息日志状态
- 检查补偿任务执行情况
- 确认幂等性控制是否生效
分布式锁:
- 检查锁超时时间是否合理
- 查看锁持有者是否异常终止
- 确认Redis/ZK集群状态
消息队列:
- 查看消费积压情况
- 检查死信队列
- 确认消费者线程池配置
性能问题:
- 分析调用链瓶颈
- 检查线程池配置
- 查看数据库连接池状态