在单体应用向微服务架构演进的今天,分布式系统已经成为大型互联网公司的标配。分布式系统的核心挑战在于:如何让多个独立的节点协同工作,对外表现得像一个整体。这涉及到数据一致性、服务发现、负载均衡、分布式事务、分布式锁、消息队列、链路追踪等一系列核心技术。
本文将围绕“分布式核心技术”这一主题,从分布式理论基础、服务治理、分布式一致性协议、分布式事务、分布式锁、消息中间件、负载均衡与容错、分布式存储、链路追踪与可观测性九个维度,深入剖析分布式系统的底层原理与实践。
一、分布式理论基础
1.1 CAP定理
CAP定理由Eric Brewer提出,指出分布式系统无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition Tolerance),最多只能满足其中两个。
┌─────────────┐
│ Consistency │
│ (C) │
└──────┬───────┘
│
CP系统 ←───────┼───────→ AP系统
(ZooKeeper/ │ (Eureka/
HBase) │ Cassandra)
│
┌──────┴───────┐
│ Availability │
│ (A) │
└──────────────┘
Partition Tolerance (P) 是分布式系统的必选项

关键理解:网络分区一定会发生,所以P是必选项。实际选择是CP还是AP。
1.2 BASE理论
BASE是对CAP中AP的延伸,强调通过最终一致性来获得高可用。
BA:Basically Available(基本可用)—— 允许部分功能降级
S:Soft State(软状态)—— 允许中间状态存在
E:Eventually Consistent(最终一致性)—— 经过一段时间后,数据最终达到一致
1.3 一致性模型
1.4 分布式系统面临的挑战
┌─────────────────────────────────────────────────────────────┐
│ 分布式系统挑战 │
├─────────────────────────────────────────────────────────────┤
│ 1. 网络延迟:节点间通信不可预测,调用可能超时 │
│ 2. 时钟漂移:不同机器系统时钟不一致,依赖时间的逻辑不可靠 │
│ 3. 部分失败:部分节点失败,整个系统不能完全不可用 │
│ 4. 脑裂:网络分区导致多个主节点同时存在 │
│ 5. 并发控制:多节点同时操作同一资源 │
└─────────────────────────────────────────────────────────────┘
二、服务治理
2.1 服务注册与发现
服务注册与发现是微服务架构的基石,解决服务实例动态变化的问题。
┌─────────────────────────────────────────────────────────────────┐
│ 服务注册与发现架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ 1.注册 ┌──────────────┐ │
│ │ Service │ ───────────────────→ │ Registry │ │
│ │ Provider │ ←─────────────────── │ (etcd/ │ │
│ └──────────┘ 心跳续约 │ ZooKeeper)│ │
│ │ └──────┬───────┘ │
│ │ │ │
│ │ 2.获取服务列表 │
│ │ │ │
│ │ ▼ │
│ │ ┌────────────────────────────────────────┐ │
│ │ │ Consumer │ │
│ │ │ ┌──────────────────────────────┐ │ │
│ │ └─→│ 本地缓存: [192.168.1.1:8080, │ │ │
│ │ │ 192.168.1.2:8080] │ │ │
│ │ └──────────────────────────────┘ │ │
│ │ │ │ │
│ └────────────────────┘ │ │
│ 3.负载均衡调用 │ │
│ │
└─────────────────────────────────────────────────────────────────┘
服务注册核心实现(模拟etcd)
@Service
public class ServiceRegistry {
private final EtcdClient etcdClient;
private final String registryPath = "/services/";
private final long leaseTTL = 30; // 30秒租约
private long leaseId;
// 服务注册(带健康检查)
public void register(String serviceName, String instanceId, String address, int port) {
String key = registryPath + serviceName + "/" + instanceId;
String value = address + ":" + port;
try {
// 创建租约(TTL),服务下线后自动过期
Lease leaseClient = etcdClient.getLeaseClient();
LeaseGrantResponse leaseResponse = leaseClient.grant(leaseTTL).get();
leaseId = leaseResponse.getID();
// 启动心跳续约
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
try {
leaseClient.keepAliveOnce(leaseId).get();
} catch (Exception e) {
log.error("心跳续约失败", e);
}
}, 5, 5, TimeUnit.SECONDS);
// 写入注册信息
PutResponse response = etcdClient.getKVClient()
.put(ByteSequence.fromString(key), ByteSequence.fromString(value), PutOption.newBuilder().setLeaseId(leaseId).build())
.get();
log.info("服务注册成功: {} -> {}", key, value);
// 注册JVM关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
etcdClient.getKVClient().delete(ByteSequence.fromString(key)).get();
leaseClient.revoke(leaseId);
} catch (Exception e) {
log.error("服务注销失败", e);
}
}));
} catch (Exception e) {
throw new RuntimeException("服务注册失败", e);
}
}
// 服务发现(带本地缓存和监听)
public class ServiceDiscovery {
private final Map<String, List<ServiceInstance>> cache = new ConcurrentHashMap<>();
private final Map<String, Watch> watches = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
// 监听服务变更
Watch watchClient = etcdClient.getWatchClient();
watchClient.watch(ByteSequence.fromString(registryPath), WatchOption.newBuilder().isPrefix(true).build(),
watchResponse -> {
for (WatchEvent event : watchResponse.getEvents()) {
String key = event.getKeyValue().getKey().toStringUtf8();
String serviceName = extractServiceName(key);
refreshCache(serviceName);
}
});
}
public List<ServiceInstance> getInstances(String serviceName) {
return cache.computeIfAbsent(serviceName, k -> refreshCache(serviceName));
}
private List<ServiceInstance> refreshCache(String serviceName) {
String prefix = registryPath + serviceName + "/";
try {
GetResponse response = etcdClient.getKVClient()
.get(ByteSequence.fromString(prefix), GetOption.newBuilder().isPrefix(true).build())
.get();
List<ServiceInstance> instances = response.getKvs().stream()
.map(kv -> {
String value = kv.getValue().toStringUtf8();
String[] parts = value.split(":");
return new ServiceInstance(parts[0], Integer.parseInt(parts[1]));
})
.collect(Collectors.toList());
cache.put(serviceName, instances);
return instances;
} catch (Exception e) {
log.error("刷新服务缓存失败", e);
return Collections.emptyList();
}
}
}
}
2.2 负载均衡算法实现
加权轮询(平滑加权轮询 - Nginx的SWRR算法)
public class SmoothWeightedRoundRobin {
private static class Server {
String address;
int weight; // 配置权重
int effectiveWeight; // 有效权重(动态调整)
int currentWeight; // 当前权重
Server(String address, int weight) {
this.address = address;
this.weight = weight;
this.effectiveWeight = weight;
this.currentWeight = 0;
}
}
private final List<Server> servers;
private int totalWeight;
public SmoothWeightedRoundRobin(Map<String, Integer> serverWeightMap) {
this.servers = new ArrayList<>();
for (Map.Entry<String, Integer> entry : serverWeightMap.entrySet()) {
Server server = new Server(entry.getKey(), entry.getValue());
this.servers.add(server);
this.totalWeight += entry.getValue();
}
}
// 平滑加权轮询算法
public synchronized Server next() {
Server selected = null;
int maxCurrent = -1;
// 1. 每个服务器的currentWeight += effectiveWeight
for (Server server : servers) {
server.currentWeight += server.effectiveWeight;
if (server.currentWeight > maxCurrent) {
maxCurrent = server.currentWeight;
selected = server;
}
}
// 2. 选中服务器的currentWeight -= totalWeight
if (selected != null) {
selected.currentWeight -= totalWeight;
}
return selected;
}
// 服务器故障时降低权重
public void markAsFailed(Server server) {
server.effectiveWeight -= server.weight / 2;
if (server.effectiveWeight < 0) {
server.effectiveWeight = 0;
}
}
// 测试输出
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
map.put("serverA", 5);
map.put("serverB", 1);
map.put("serverC", 1);
SmoothWeightedRoundRobin swrr = new SmoothWeightedRoundRobin(map);
// 模拟10次调用
Map<String, Integer> counts = new HashMap<>();
for (int i = 0; i < 70; i++) {
Server server = swrr.next();
counts.merge(server.address, 1, Integer::sum);
System.out.printf("第%d次请求: %s\n", i + 1, server.address);
}
System.out.println("最终分布: " + counts);
// 输出: serverA约50次, serverB约10次, serverC约10次(符合权重5:1:1)
}
}
一致性哈希(带虚拟节点)
public class ConsistentHashRouter {
private final SortedMap<Integer, String> circle = new TreeMap<>();
private final int virtualNodeCount; // 虚拟节点数
private final HashFunction hashFunction;
public ConsistentHashRouter(List<String> servers, int virtualNodeCount) {
this.virtualNodeCount = virtualNodeCount;
this.hashFunction = new Murmur3Hash();
for (String server : servers) {
addServer(server);
}
}
// 添加服务器(包含虚拟节点)
public void addServer(String server) {
for (int i = 0; i < virtualNodeCount; i++) {
String virtualNodeKey = server + "#VN" + i;
int hash = hashFunction.hash(virtualNodeKey);
circle.put(hash, server);
}
}
// 移除服务器
public void removeServer(String server) {
for (int i = 0; i < virtualNodeCount; i++) {
String virtualNodeKey = server + "#VN" + i;
int hash = hashFunction.hash(virtualNodeKey);
circle.remove(hash);
}
}
// 根据key获取服务器
public String getServer(String key) {
if (circle.isEmpty()) {
return null;
}
int hash = hashFunction.hash(key);
// 找到大于等于hash的第一个节点
SortedMap<Integer, String> tailMap = circle.tailMap(hash);
Integer nodeHash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
return circle.get(nodeHash);
}
// 模拟节点变化影响分析
public void analyzeImpact() {
List<String> testKeys = generateTestKeys(10000);
// 原始分布
Map<String, Integer> originalDistribution = new HashMap<>();
for (String key : testKeys) {
String server = getServer(key);
originalDistribution.merge(server, 1, Integer::sum);
}
// 移除一个节点
String removedServer = "server3";
removeServer(removedServer);
// 新分布
Map<String, Integer> newDistribution = new HashMap<>();
Map<String, Integer> affectedCount = new HashMap<>();
for (String key : testKeys) {
String oldServer = getOriginalServer(key); // 原算法
String newServer = getServer(key);
newDistribution.merge(newServer, 1, Integer::sum);
if (!oldServer.equals(newServer)) {
affectedCount.merge(key, 1, Integer::sum);
}
}
double affectedRatio = (double) affectedCount.size() / testKeys.size();
System.out.printf("移除服务器 %s 后,受影响比例: %.2f%%\n", removedServer, affectedRatio * 100);
// 输出: 受影响比例约 1/节点数(因为一致性哈希只影响相邻节点)
}
// Murmur3哈希(雪崩效应好,碰撞率低)
static class Murmur3Hash implements HashFunction {
@Override
public int hash(String key) {
// 实现Murmur3哈希算法
return Hashing.murmur3_32().hashBytes(key.getBytes(StandardCharsets.UTF_8)).asInt();
}
}
}
2.3 服务熔断、降级、限流
熔断器实现(状态机模式)
public class CircuitBreaker {
// 熔断器状态
private enum State {
CLOSED, // 关闭状态:正常调用
OPEN, // 开启状态:熔断,直接返回失败
HALF_OPEN // 半开状态:尝试恢复
}
private State state = State.CLOSED;
// 配置参数
private final int failureThreshold; // 失败阈值,如5次
private final int successThreshold; // 半开状态成功阈值,如3次
private final long timeout; // 熔断开启后等待时间,如30秒
// 统计信息
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicLong lastOpenTime = new AtomicLong(0);
// 滑动窗口计数器(替代简单计数器,防止突发流量)
private final SlidingWindowCounter windowCounter;
public CircuitBreaker(int failureThreshold, int successThreshold, long timeout, int windowSize) {
this.failureThreshold = failureThreshold;
this.successThreshold = successThreshold;
this.timeout = timeout;
this.windowCounter = new SlidingWindowCounter(windowSize, 1000); // 1秒一个桶
}
// 执行受保护的方法
public <T> T execute(Supplier<T> supplier, Supplier<T> fallback) {
if (!allowRequest()) {
// 熔断开启,执行降级
return fallback.get();
}
try {
T result = supplier.get();
recordSuccess();
return result;
} catch (Exception e) {
recordFailure();
throw e;
}
}
private synchronized boolean allowRequest() {
switch (state) {
case CLOSED:
// 检查失败率
double failureRate = windowCounter.getFailureRate();
if (failureRate >= 0.5) { // 失败率超过50%开启熔断
state = State.OPEN;
lastOpenTime.set(System.currentTimeMillis());
log.warn("熔断器开启,失败率: {}%", failureRate * 100);
return false;
}
return true;
case OPEN:
// 检查是否到达超时时间
long elapsed = System.currentTimeMillis() - lastOpenTime.get();
if (elapsed >= timeout) {
state = State.HALF_OPEN;
log.info("熔断器进入半开状态");
return true;
}
return false;
case HALF_OPEN:
return true;
default:
return true;
}
}
private synchronized void recordSuccess() {
windowCounter.recordSuccess();
if (state == State.HALF_OPEN) {
int success = successCount.incrementAndGet();
if (success >= successThreshold) {
// 恢复成功,关闭熔断器
state = State.CLOSED;
failureCount.set(0);
successCount.set(0);
windowCounter.reset();
log.info("熔断器关闭,服务恢复");
}
}
}
private synchronized void recordFailure() {
windowCounter.recordFailure();
if (state == State.HALF_OPEN) {
// 半开状态失败一次,立即重新开启熔断
state = State.OPEN;
lastOpenTime.set(System.currentTimeMillis());
successCount.set(0);
log.warn("熔断器重新开启,半开状态失败");
}
}
// 滑动窗口计数器实现
static class SlidingWindowCounter {
private final int windowSize; // 窗口大小(桶数量)
private final long bucketDuration; // 每个桶的时间跨度(ms)
private final AtomicLong[] successBuckets;
private final AtomicLong[] failureBuckets;
private final AtomicLong lastBucketIndex = new AtomicLong(0);
public SlidingWindowCounter(int windowSize, long bucketDuration) {
this.windowSize = windowSize;
this.bucketDuration = bucketDuration;
this.successBuckets = new AtomicLong[windowSize];
this.failureBuckets = new AtomicLong[windowSize];
for (int i = 0; i < windowSize; i++) {
successBuckets[i] = new AtomicLong(0);
failureBuckets[i] = new AtomicLong(0);
}
}
private int getCurrentBucketIndex() {
long now = System.currentTimeMillis();
long currentBucket = now / bucketDuration;
long previousBucket = lastBucketIndex.getAndUpdate(old -> Math.max(old, currentBucket));
if (currentBucket > previousBucket) {
// 时间推进,清理过期的桶
long expiredBuckets = Math.min(windowSize, currentBucket - previousBucket);
for (int i = 0; i < expiredBuckets; i++) {
int index = (int) ((previousBucket + i + 1) % windowSize);
successBuckets[index].set(0);
failureBuckets[index].set(0);
}
}
return (int) (currentBucket % windowSize);
}
public void recordSuccess() {
successBuckets[getCurrentBucketIndex()].incrementAndGet();
}
public void recordFailure() {
failureBuckets[getCurrentBucketIndex()].incrementAndGet();
}
public double getFailureRate() {
long totalSuccess = 0;
long totalFailure = 0;
for (int i = 0; i < windowSize; i++) {
totalSuccess += successBuckets[i].get();
totalFailure += failureBuckets[i].get();
}
long total = totalSuccess + totalFailure;
return total == 0 ? 0 : (double) totalFailure / total;
}
public void reset() {
for (int i = 0; i < windowSize; i++) {
successBuckets[i].set(0);
failureBuckets[i].set(0);
}
}
}
}
// 使用示例(结合Spring AOP)
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CircuitBreaker {
int failureThreshold() default 5;
int successThreshold() default 3;
long timeout() default 30000;
}
@Aspect
@Component
public class CircuitBreakerAspect {
private final Map<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();
@Around("@annotation(circuitBreaker)")
public Object around(ProceedingJoinPoint joinPoint, CircuitBreaker circuitBreaker) throws Throwable {
String methodKey = joinPoint.getSignature().toLongString();
CircuitBreaker breaker = breakers.computeIfAbsent(methodKey,
k -> new CircuitBreaker(circuitBreaker.failureThreshold(),
circuitBreaker.successThreshold(),
circuitBreaker.timeout(), 10));
return breaker.execute(
() -> {
try {
return joinPoint.proceed();
} catch (Throwable t) {
throw new RuntimeException(t);
}
},
() -> {
// 降级逻辑
log.warn("服务熔断,执行降级: {}", methodKey);
return null;
}
);
}
}
限流算法实现
// 1. 令牌桶算法(支持突发流量)
public class TokenBucketLimiter {
private final long capacity; // 桶容量
private final long refillTokens; // 每次补充的令牌数
private final long refillIntervalMs; // 补充间隔(ms)
private AtomicLong availableTokens;
private AtomicLong lastRefillTime;
public TokenBucketLimiter(long capacity, long refillTokens, long refillIntervalMs) {
this.capacity = capacity;
this.refillTokens = refillTokens;
this.refillIntervalMs = refillIntervalMs;
this.availableTokens = new AtomicLong(capacity);
this.lastRefillTime = new AtomicLong(System.currentTimeMillis());
}
public boolean tryAcquire() {
refillTokens();
return availableTokens.decrementAndGet() >= 0;
}
private void refillTokens() {
long now = System.currentTimeMillis();
long last = lastRefillTime.get();
long elapsed = now - last;
if (elapsed >= refillIntervalMs) {
synchronized (this) {
// 双重检查
if (now - lastRefillTime.get() >= refillIntervalMs) {
long newTokens = Math.min(capacity,
availableTokens.get() + (elapsed / refillIntervalMs) * refillTokens);
availableTokens.set(newTokens);
lastRefillTime.set(now);
}
}
}
}
}
// 2. 漏桶算法(平滑流量)
public class LeakyBucketLimiter {
private final long capacity; // 桶容量
private final long leakRate; // 漏水速率(请求/秒)
private AtomicLong water; // 当前水量
private AtomicLong lastLeakTime;
public LeakyBucketLimiter(long capacity, long leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
this.water = new AtomicLong(0);
this.lastLeakTime = new AtomicLong(System.currentTimeMillis());
}
public boolean tryAcquire() {
leak();
if (water.get() < capacity) {
water.incrementAndGet();
return true;
}
return false;
}
private void leak() {
long now = System.currentTimeMillis();
long last = lastLeakTime.get();
long elapsed = now - last;
if (elapsed > 0) {
long leaked = elapsed * leakRate / 1000; // 漏出的水量
if (leaked > 0) {
water.updateAndGet(w -> Math.max(0, w - leaked));
lastLeakTime.set(now);
}
}
}
}
// 3. 滑动窗口限流(精确控制,适用于时间窗口均匀分布)
public class SlidingWindowRateLimiter {
private final int limit; // 窗口内限制请求数
private final long windowSizeMs; // 窗口大小(ms)
private final int bucketCount; // 桶数量
private final AtomicLong[] counters;
private final AtomicLong[] timestamps;
private final AtomicLong lastUpdateTime;
public SlidingWindowRateLimiter(int limit, long windowSizeMs, int bucketCount) {
this.limit = limit;
this.windowSizeMs = windowSizeMs;
this.bucketCount = bucketCount;
this.counters = new AtomicLong[bucketCount];
this.timestamps = new AtomicLong[bucketCount];
for (int i = 0; i < bucketCount; i++) {
counters[i] = new AtomicLong(0);
timestamps[i] = new AtomicLong(0);
}
this.lastUpdateTime = new AtomicLong(System.currentTimeMillis());
}
public boolean tryAcquire() {
long now = System.currentTimeMillis();
long bucketDuration = windowSizeMs / bucketCount;
int currentBucket = (int) ((now / bucketDuration) % bucketCount);
// 清理过期的桶
long currentWindowStart = now - windowSizeMs;
for (int i = 0; i < bucketCount; i++) {
if (timestamps[i].get() < currentWindowStart) {
counters[i].set(0);
timestamps[i].set(now);
}
}
// 统计窗口内总请求数
long total = 0;
for (int i = 0; i < bucketCount; i++) {
total += counters[i].get();
}
if (total >= limit) {
return false;
}
// 当前桶计数+1
counters[currentBucket].incrementAndGet();
timestamps[currentBucket].set(now);
return true;
}
}
// 4. 基于Redis的分布式限流(Lua脚本保证原子性)
public class RedisRateLimiter {
private final StringRedisTemplate redisTemplate;
private final String keyPrefix = "rate_limiter:";
// Lua脚本:令牌桶算法(原子操作)
private final String LUA_SCRIPT =
"local key = KEYS[1] " +
"local capacity = tonumber(ARGV[1]) " +
"local refillRate = tonumber(ARGV[2]) " +
"local now = tonumber(ARGV[3]) " +
"local requested = tonumber(ARGV[4]) " +
"local lastRefillTime = redis.call('HGET', key, 'lastRefillTime') " +
"local tokens = redis.call('HGET', key, 'tokens') " +
"if lastRefillTime == false then " +
" tokens = capacity " +
" lastRefillTime = now " +
"else " +
" local elapsed = now - lastRefillTime " +
" local refill = elapsed * refillRate " +
" tokens = math.min(capacity, tokens + refill) " +
" lastRefillTime = now " +
"end " +
"local allowed = 0 " +
"if tokens >= requested then " +
" allowed = 1 " +
" tokens = tokens - requested " +
"end " +
"redis.call('HMSET', key, 'lastRefillTime', lastRefillTime, 'tokens', tokens) " +
"redis.call('EXPIRE', key, 3600) " +
"return allowed";
private final RedisScript<Long> rateLimiterScript;
public RedisRateLimiter(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
this.rateLimiterScript = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
}
public boolean tryAcquire(String clientId, int capacity, int refillRate) {
String key = keyPrefix + clientId;
long now = System.currentTimeMillis() / 1000; // 秒级时间戳
Long result = redisTemplate.execute(
rateLimiterScript,
Collections.singletonList(key),
String.valueOf(capacity),
String.valueOf(refillRate),
String.valueOf(now),
"1" // 请求1个令牌
);
return result != null && result == 1L;
}
}