程序员必备的十大技能(进阶版)之分布式核心技术(一)

简介: 教程来源 http://unbgv.cn/ 本文系统剖析分布式核心技术,涵盖CAP/BASE理论、服务治理、一致性协议、分布式事务、锁、消息中间件、负载均衡、存储及可观测性九大维度,直击微服务演进中的核心挑战与落地实践。

在单体应用向微服务架构演进的今天,分布式系统已经成为大型互联网公司的标配。分布式系统的核心挑战在于:如何让多个独立的节点协同工作,对外表现得像一个整体。这涉及到数据一致性、服务发现、负载均衡、分布式事务、分布式锁、消息队列、链路追踪等一系列核心技术。

本文将围绕“分布式核心技术”这一主题,从分布式理论基础、服务治理、分布式一致性协议、分布式事务、分布式锁、消息中间件、负载均衡与容错、分布式存储、链路追踪与可观测性九个维度,深入剖析分布式系统的底层原理与实践。

一、分布式理论基础

1.1 CAP定理
CAP定理由Eric Brewer提出,指出分布式系统无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition Tolerance),最多只能满足其中两个。

                    ┌─────────────┐
                    │   Consistency │
                    │     (C)       │
                    └──────┬───────┘
                           │
            CP系统 ←───────┼───────→ AP系统
          (ZooKeeper/     │      (Eureka/
           HBase)         │       Cassandra)
                           │
                    ┌──────┴───────┐
                    │  Availability │
                    │     (A)       │
                    └──────────────┘

              Partition Tolerance (P) 是分布式系统的必选项

image.png
关键理解:网络分区一定会发生,所以P是必选项。实际选择是CP还是AP。
1.2 BASE理论
BASE是对CAP中AP的延伸,强调通过最终一致性来获得高可用。

BA:Basically Available(基本可用)—— 允许部分功能降级

S:Soft State(软状态)—— 允许中间状态存在

E:Eventually Consistent(最终一致性)—— 经过一段时间后,数据最终达到一致

1.3 一致性模型
image.png
1.4 分布式系统面临的挑战

┌─────────────────────────────────────────────────────────────┐
│                      分布式系统挑战                          │
├─────────────────────────────────────────────────────────────┤
│ 1. 网络延迟:节点间通信不可预测,调用可能超时                 │
│ 2. 时钟漂移:不同机器系统时钟不一致,依赖时间的逻辑不可靠      │
│ 3. 部分失败:部分节点失败,整个系统不能完全不可用             │
│ 4. 脑裂:网络分区导致多个主节点同时存在                      │
│ 5. 并发控制:多节点同时操作同一资源                          │
└─────────────────────────────────────────────────────────────┘

二、服务治理

2.1 服务注册与发现
服务注册与发现是微服务架构的基石,解决服务实例动态变化的问题。

┌─────────────────────────────────────────────────────────────────┐
│                        服务注册与发现架构                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ┌──────────┐        1.注册         ┌──────────────┐          │
│   │ Service  │ ───────────────────→  │   Registry   │          │
│   │ Provider │ ←───────────────────  │   (etcd/     │          │
│   └──────────┘        心跳续约        │    ZooKeeper)│          │
│        │                              └──────┬───────┘          │
│        │                                      │                  │
│        │                             2.获取服务列表              │
│        │                                      │                  │
│        │                                      ▼                  │
│        │ ┌────────────────────────────────────────┐             │
│        │ │               Consumer                  │             │
│        │ │   ┌──────────────────────────────┐     │             │
│        │ └─→│  本地缓存: [192.168.1.1:8080, │     │             │
│        │    │           192.168.1.2:8080]   │     │             │
│        │    └──────────────────────────────┘     │             │
│        │                    │                     │             │
│        └────────────────────┘                     │             │
│                   3.负载均衡调用                   │             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

服务注册核心实现(模拟etcd)

@Service
public class ServiceRegistry {

    private final EtcdClient etcdClient;
    private final String registryPath = "/services/";
    private final long leaseTTL = 30;  // 30秒租约
    private long leaseId;

    // 服务注册(带健康检查)
    public void register(String serviceName, String instanceId, String address, int port) {
        String key = registryPath + serviceName + "/" + instanceId;
        String value = address + ":" + port;

        try {
            // 创建租约(TTL),服务下线后自动过期
            Lease leaseClient = etcdClient.getLeaseClient();
            LeaseGrantResponse leaseResponse = leaseClient.grant(leaseTTL).get();
            leaseId = leaseResponse.getID();

            // 启动心跳续约
            ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
            executor.scheduleAtFixedRate(() -> {
                try {
                    leaseClient.keepAliveOnce(leaseId).get();
                } catch (Exception e) {
                    log.error("心跳续约失败", e);
                }
            }, 5, 5, TimeUnit.SECONDS);

            // 写入注册信息
            PutResponse response = etcdClient.getKVClient()
                .put(ByteSequence.fromString(key), ByteSequence.fromString(value), PutOption.newBuilder().setLeaseId(leaseId).build())
                .get();

            log.info("服务注册成功: {} -> {}", key, value);

            // 注册JVM关闭钩子
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    etcdClient.getKVClient().delete(ByteSequence.fromString(key)).get();
                    leaseClient.revoke(leaseId);
                } catch (Exception e) {
                    log.error("服务注销失败", e);
                }
            }));

        } catch (Exception e) {
            throw new RuntimeException("服务注册失败", e);
        }
    }

    // 服务发现(带本地缓存和监听)
    public class ServiceDiscovery {

        private final Map<String, List<ServiceInstance>> cache = new ConcurrentHashMap<>();
        private final Map<String, Watch> watches = new ConcurrentHashMap<>();

        @PostConstruct
        public void init() {
            // 监听服务变更
            Watch watchClient = etcdClient.getWatchClient();
            watchClient.watch(ByteSequence.fromString(registryPath), WatchOption.newBuilder().isPrefix(true).build(),
                watchResponse -> {
                    for (WatchEvent event : watchResponse.getEvents()) {
                        String key = event.getKeyValue().getKey().toStringUtf8();
                        String serviceName = extractServiceName(key);
                        refreshCache(serviceName);
                    }
                });
        }

        public List<ServiceInstance> getInstances(String serviceName) {
            return cache.computeIfAbsent(serviceName, k -> refreshCache(serviceName));
        }

        private List<ServiceInstance> refreshCache(String serviceName) {
            String prefix = registryPath + serviceName + "/";
            try {
                GetResponse response = etcdClient.getKVClient()
                    .get(ByteSequence.fromString(prefix), GetOption.newBuilder().isPrefix(true).build())
                    .get();

                List<ServiceInstance> instances = response.getKvs().stream()
                    .map(kv -> {
                        String value = kv.getValue().toStringUtf8();
                        String[] parts = value.split(":");
                        return new ServiceInstance(parts[0], Integer.parseInt(parts[1]));
                    })
                    .collect(Collectors.toList());

                cache.put(serviceName, instances);
                return instances;
            } catch (Exception e) {
                log.error("刷新服务缓存失败", e);
                return Collections.emptyList();
            }
        }
    }
}

2.2 负载均衡算法实现
加权轮询(平滑加权轮询 - Nginx的SWRR算法)

public class SmoothWeightedRoundRobin {

    private static class Server {
        String address;
        int weight;        // 配置权重
        int effectiveWeight; // 有效权重(动态调整)
        int currentWeight;   // 当前权重

        Server(String address, int weight) {
            this.address = address;
            this.weight = weight;
            this.effectiveWeight = weight;
            this.currentWeight = 0;
        }
    }

    private final List<Server> servers;
    private int totalWeight;

    public SmoothWeightedRoundRobin(Map<String, Integer> serverWeightMap) {
        this.servers = new ArrayList<>();
        for (Map.Entry<String, Integer> entry : serverWeightMap.entrySet()) {
            Server server = new Server(entry.getKey(), entry.getValue());
            this.servers.add(server);
            this.totalWeight += entry.getValue();
        }
    }

    // 平滑加权轮询算法
    public synchronized Server next() {
        Server selected = null;
        int maxCurrent = -1;

        // 1. 每个服务器的currentWeight += effectiveWeight
        for (Server server : servers) {
            server.currentWeight += server.effectiveWeight;
            if (server.currentWeight > maxCurrent) {
                maxCurrent = server.currentWeight;
                selected = server;
            }
        }

        // 2. 选中服务器的currentWeight -= totalWeight
        if (selected != null) {
            selected.currentWeight -= totalWeight;
        }

        return selected;
    }

    // 服务器故障时降低权重
    public void markAsFailed(Server server) {
        server.effectiveWeight -= server.weight / 2;
        if (server.effectiveWeight < 0) {
            server.effectiveWeight = 0;
        }
    }

    // 测试输出
    public static void main(String[] args) {
        Map<String, Integer> map = new HashMap<>();
        map.put("serverA", 5);
        map.put("serverB", 1);
        map.put("serverC", 1);

        SmoothWeightedRoundRobin swrr = new SmoothWeightedRoundRobin(map);

        // 模拟10次调用
        Map<String, Integer> counts = new HashMap<>();
        for (int i = 0; i < 70; i++) {
            Server server = swrr.next();
            counts.merge(server.address, 1, Integer::sum);
            System.out.printf("第%d次请求: %s\n", i + 1, server.address);
        }
        System.out.println("最终分布: " + counts);
        // 输出: serverA约50次, serverB约10次, serverC约10次(符合权重5:1:1)
    }
}

一致性哈希(带虚拟节点)

public class ConsistentHashRouter {

    private final SortedMap<Integer, String> circle = new TreeMap<>();
    private final int virtualNodeCount;  // 虚拟节点数
    private final HashFunction hashFunction;

    public ConsistentHashRouter(List<String> servers, int virtualNodeCount) {
        this.virtualNodeCount = virtualNodeCount;
        this.hashFunction = new Murmur3Hash();

        for (String server : servers) {
            addServer(server);
        }
    }

    // 添加服务器(包含虚拟节点)
    public void addServer(String server) {
        for (int i = 0; i < virtualNodeCount; i++) {
            String virtualNodeKey = server + "#VN" + i;
            int hash = hashFunction.hash(virtualNodeKey);
            circle.put(hash, server);
        }
    }

    // 移除服务器
    public void removeServer(String server) {
        for (int i = 0; i < virtualNodeCount; i++) {
            String virtualNodeKey = server + "#VN" + i;
            int hash = hashFunction.hash(virtualNodeKey);
            circle.remove(hash);
        }
    }

    // 根据key获取服务器
    public String getServer(String key) {
        if (circle.isEmpty()) {
            return null;
        }
        int hash = hashFunction.hash(key);

        // 找到大于等于hash的第一个节点
        SortedMap<Integer, String> tailMap = circle.tailMap(hash);
        Integer nodeHash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();

        return circle.get(nodeHash);
    }

    // 模拟节点变化影响分析
    public void analyzeImpact() {
        List<String> testKeys = generateTestKeys(10000);

        // 原始分布
        Map<String, Integer> originalDistribution = new HashMap<>();
        for (String key : testKeys) {
            String server = getServer(key);
            originalDistribution.merge(server, 1, Integer::sum);
        }

        // 移除一个节点
        String removedServer = "server3";
        removeServer(removedServer);

        // 新分布
        Map<String, Integer> newDistribution = new HashMap<>();
        Map<String, Integer> affectedCount = new HashMap<>();

        for (String key : testKeys) {
            String oldServer = getOriginalServer(key);  // 原算法
            String newServer = getServer(key);

            newDistribution.merge(newServer, 1, Integer::sum);

            if (!oldServer.equals(newServer)) {
                affectedCount.merge(key, 1, Integer::sum);
            }
        }

        double affectedRatio = (double) affectedCount.size() / testKeys.size();
        System.out.printf("移除服务器 %s 后,受影响比例: %.2f%%\n", removedServer, affectedRatio * 100);
        // 输出: 受影响比例约 1/节点数(因为一致性哈希只影响相邻节点)
    }

    // Murmur3哈希(雪崩效应好,碰撞率低)
    static class Murmur3Hash implements HashFunction {
        @Override
        public int hash(String key) {
            // 实现Murmur3哈希算法
            return Hashing.murmur3_32().hashBytes(key.getBytes(StandardCharsets.UTF_8)).asInt();
        }
    }
}

2.3 服务熔断、降级、限流
熔断器实现(状态机模式)

public class CircuitBreaker {

    // 熔断器状态
    private enum State {
        CLOSED,      // 关闭状态:正常调用
        OPEN,        // 开启状态:熔断,直接返回失败
        HALF_OPEN    // 半开状态:尝试恢复
    }

    private State state = State.CLOSED;

    // 配置参数
    private final int failureThreshold;      // 失败阈值,如5次
    private final int successThreshold;      // 半开状态成功阈值,如3次
    private final long timeout;              // 熔断开启后等待时间,如30秒

    // 统计信息
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicInteger successCount = new AtomicInteger(0);
    private final AtomicLong lastOpenTime = new AtomicLong(0);

    // 滑动窗口计数器(替代简单计数器,防止突发流量)
    private final SlidingWindowCounter windowCounter;

    public CircuitBreaker(int failureThreshold, int successThreshold, long timeout, int windowSize) {
        this.failureThreshold = failureThreshold;
        this.successThreshold = successThreshold;
        this.timeout = timeout;
        this.windowCounter = new SlidingWindowCounter(windowSize, 1000);  // 1秒一个桶
    }

    // 执行受保护的方法
    public <T> T execute(Supplier<T> supplier, Supplier<T> fallback) {
        if (!allowRequest()) {
            // 熔断开启,执行降级
            return fallback.get();
        }

        try {
            T result = supplier.get();
            recordSuccess();
            return result;
        } catch (Exception e) {
            recordFailure();
            throw e;
        }
    }

    private synchronized boolean allowRequest() {
        switch (state) {
            case CLOSED:
                // 检查失败率
                double failureRate = windowCounter.getFailureRate();
                if (failureRate >= 0.5) {  // 失败率超过50%开启熔断
                    state = State.OPEN;
                    lastOpenTime.set(System.currentTimeMillis());
                    log.warn("熔断器开启,失败率: {}%", failureRate * 100);
                    return false;
                }
                return true;

            case OPEN:
                // 检查是否到达超时时间
                long elapsed = System.currentTimeMillis() - lastOpenTime.get();
                if (elapsed >= timeout) {
                    state = State.HALF_OPEN;
                    log.info("熔断器进入半开状态");
                    return true;
                }
                return false;

            case HALF_OPEN:
                return true;

            default:
                return true;
        }
    }

    private synchronized void recordSuccess() {
        windowCounter.recordSuccess();
        if (state == State.HALF_OPEN) {
            int success = successCount.incrementAndGet();
            if (success >= successThreshold) {
                // 恢复成功,关闭熔断器
                state = State.CLOSED;
                failureCount.set(0);
                successCount.set(0);
                windowCounter.reset();
                log.info("熔断器关闭,服务恢复");
            }
        }
    }

    private synchronized void recordFailure() {
        windowCounter.recordFailure();
        if (state == State.HALF_OPEN) {
            // 半开状态失败一次,立即重新开启熔断
            state = State.OPEN;
            lastOpenTime.set(System.currentTimeMillis());
            successCount.set(0);
            log.warn("熔断器重新开启,半开状态失败");
        }
    }

    // 滑动窗口计数器实现
    static class SlidingWindowCounter {
        private final int windowSize;      // 窗口大小(桶数量)
        private final long bucketDuration; // 每个桶的时间跨度(ms)
        private final AtomicLong[] successBuckets;
        private final AtomicLong[] failureBuckets;
        private final AtomicLong lastBucketIndex = new AtomicLong(0);

        public SlidingWindowCounter(int windowSize, long bucketDuration) {
            this.windowSize = windowSize;
            this.bucketDuration = bucketDuration;
            this.successBuckets = new AtomicLong[windowSize];
            this.failureBuckets = new AtomicLong[windowSize];
            for (int i = 0; i < windowSize; i++) {
                successBuckets[i] = new AtomicLong(0);
                failureBuckets[i] = new AtomicLong(0);
            }
        }

        private int getCurrentBucketIndex() {
            long now = System.currentTimeMillis();
            long currentBucket = now / bucketDuration;
            long previousBucket = lastBucketIndex.getAndUpdate(old -> Math.max(old, currentBucket));

            if (currentBucket > previousBucket) {
                // 时间推进,清理过期的桶
                long expiredBuckets = Math.min(windowSize, currentBucket - previousBucket);
                for (int i = 0; i < expiredBuckets; i++) {
                    int index = (int) ((previousBucket + i + 1) % windowSize);
                    successBuckets[index].set(0);
                    failureBuckets[index].set(0);
                }
            }
            return (int) (currentBucket % windowSize);
        }

        public void recordSuccess() {
            successBuckets[getCurrentBucketIndex()].incrementAndGet();
        }

        public void recordFailure() {
            failureBuckets[getCurrentBucketIndex()].incrementAndGet();
        }

        public double getFailureRate() {
            long totalSuccess = 0;
            long totalFailure = 0;
            for (int i = 0; i < windowSize; i++) {
                totalSuccess += successBuckets[i].get();
                totalFailure += failureBuckets[i].get();
            }
            long total = totalSuccess + totalFailure;
            return total == 0 ? 0 : (double) totalFailure / total;
        }

        public void reset() {
            for (int i = 0; i < windowSize; i++) {
                successBuckets[i].set(0);
                failureBuckets[i].set(0);
            }
        }
    }
}

// 使用示例(结合Spring AOP)
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CircuitBreaker {
    int failureThreshold() default 5;
    int successThreshold() default 3;
    long timeout() default 30000;
}

@Aspect
@Component
public class CircuitBreakerAspect {

    private final Map<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();

    @Around("@annotation(circuitBreaker)")
    public Object around(ProceedingJoinPoint joinPoint, CircuitBreaker circuitBreaker) throws Throwable {
        String methodKey = joinPoint.getSignature().toLongString();
        CircuitBreaker breaker = breakers.computeIfAbsent(methodKey, 
            k -> new CircuitBreaker(circuitBreaker.failureThreshold(), 
                                    circuitBreaker.successThreshold(), 
                                    circuitBreaker.timeout(), 10));

        return breaker.execute(
            () -> {
                try {
                    return joinPoint.proceed();
                } catch (Throwable t) {
                    throw new RuntimeException(t);
                }
            },
            () -> {
                // 降级逻辑
                log.warn("服务熔断,执行降级: {}", methodKey);
                return null;
            }
        );
    }
}

限流算法实现

// 1. 令牌桶算法(支持突发流量)
public class TokenBucketLimiter {
    private final long capacity;           // 桶容量
    private final long refillTokens;       // 每次补充的令牌数
    private final long refillIntervalMs;   // 补充间隔(ms)

    private AtomicLong availableTokens;
    private AtomicLong lastRefillTime;

    public TokenBucketLimiter(long capacity, long refillTokens, long refillIntervalMs) {
        this.capacity = capacity;
        this.refillTokens = refillTokens;
        this.refillIntervalMs = refillIntervalMs;
        this.availableTokens = new AtomicLong(capacity);
        this.lastRefillTime = new AtomicLong(System.currentTimeMillis());
    }

    public boolean tryAcquire() {
        refillTokens();
        return availableTokens.decrementAndGet() >= 0;
    }

    private void refillTokens() {
        long now = System.currentTimeMillis();
        long last = lastRefillTime.get();
        long elapsed = now - last;

        if (elapsed >= refillIntervalMs) {
            synchronized (this) {
                // 双重检查
                if (now - lastRefillTime.get() >= refillIntervalMs) {
                    long newTokens = Math.min(capacity, 
                        availableTokens.get() + (elapsed / refillIntervalMs) * refillTokens);
                    availableTokens.set(newTokens);
                    lastRefillTime.set(now);
                }
            }
        }
    }
}

// 2. 漏桶算法(平滑流量)
public class LeakyBucketLimiter {
    private final long capacity;       // 桶容量
    private final long leakRate;       // 漏水速率(请求/秒)

    private AtomicLong water;          // 当前水量
    private AtomicLong lastLeakTime;

    public LeakyBucketLimiter(long capacity, long leakRate) {
        this.capacity = capacity;
        this.leakRate = leakRate;
        this.water = new AtomicLong(0);
        this.lastLeakTime = new AtomicLong(System.currentTimeMillis());
    }

    public boolean tryAcquire() {
        leak();
        if (water.get() < capacity) {
            water.incrementAndGet();
            return true;
        }
        return false;
    }

    private void leak() {
        long now = System.currentTimeMillis();
        long last = lastLeakTime.get();
        long elapsed = now - last;

        if (elapsed > 0) {
            long leaked = elapsed * leakRate / 1000;  // 漏出的水量
            if (leaked > 0) {
                water.updateAndGet(w -> Math.max(0, w - leaked));
                lastLeakTime.set(now);
            }
        }
    }
}

// 3. 滑动窗口限流(精确控制,适用于时间窗口均匀分布)
public class SlidingWindowRateLimiter {
    private final int limit;           // 窗口内限制请求数
    private final long windowSizeMs;   // 窗口大小(ms)
    private final int bucketCount;     // 桶数量

    private final AtomicLong[] counters;
    private final AtomicLong[] timestamps;
    private final AtomicLong lastUpdateTime;

    public SlidingWindowRateLimiter(int limit, long windowSizeMs, int bucketCount) {
        this.limit = limit;
        this.windowSizeMs = windowSizeMs;
        this.bucketCount = bucketCount;
        this.counters = new AtomicLong[bucketCount];
        this.timestamps = new AtomicLong[bucketCount];
        for (int i = 0; i < bucketCount; i++) {
            counters[i] = new AtomicLong(0);
            timestamps[i] = new AtomicLong(0);
        }
        this.lastUpdateTime = new AtomicLong(System.currentTimeMillis());
    }

    public boolean tryAcquire() {
        long now = System.currentTimeMillis();
        long bucketDuration = windowSizeMs / bucketCount;
        int currentBucket = (int) ((now / bucketDuration) % bucketCount);

        // 清理过期的桶
        long currentWindowStart = now - windowSizeMs;
        for (int i = 0; i < bucketCount; i++) {
            if (timestamps[i].get() < currentWindowStart) {
                counters[i].set(0);
                timestamps[i].set(now);
            }
        }

        // 统计窗口内总请求数
        long total = 0;
        for (int i = 0; i < bucketCount; i++) {
            total += counters[i].get();
        }

        if (total >= limit) {
            return false;
        }

        // 当前桶计数+1
        counters[currentBucket].incrementAndGet();
        timestamps[currentBucket].set(now);
        return true;
    }
}

// 4. 基于Redis的分布式限流(Lua脚本保证原子性)
public class RedisRateLimiter {

    private final StringRedisTemplate redisTemplate;
    private final String keyPrefix = "rate_limiter:";

    // Lua脚本:令牌桶算法(原子操作)
    private final String LUA_SCRIPT = 
        "local key = KEYS[1] " +
        "local capacity = tonumber(ARGV[1]) " +
        "local refillRate = tonumber(ARGV[2]) " +
        "local now = tonumber(ARGV[3]) " +
        "local requested = tonumber(ARGV[4]) " +

        "local lastRefillTime = redis.call('HGET', key, 'lastRefillTime') " +
        "local tokens = redis.call('HGET', key, 'tokens') " +

        "if lastRefillTime == false then " +
        "   tokens = capacity " +
        "   lastRefillTime = now " +
        "else " +
        "   local elapsed = now - lastRefillTime " +
        "   local refill = elapsed * refillRate " +
        "   tokens = math.min(capacity, tokens + refill) " +
        "   lastRefillTime = now " +
        "end " +

        "local allowed = 0 " +
        "if tokens >= requested then " +
        "   allowed = 1 " +
        "   tokens = tokens - requested " +
        "end " +

        "redis.call('HMSET', key, 'lastRefillTime', lastRefillTime, 'tokens', tokens) " +
        "redis.call('EXPIRE', key, 3600) " +

        "return allowed";

    private final RedisScript<Long> rateLimiterScript;

    public RedisRateLimiter(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        this.rateLimiterScript = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
    }

    public boolean tryAcquire(String clientId, int capacity, int refillRate) {
        String key = keyPrefix + clientId;
        long now = System.currentTimeMillis() / 1000;  // 秒级时间戳

        Long result = redisTemplate.execute(
            rateLimiterScript,
            Collections.singletonList(key),
            String.valueOf(capacity),
            String.valueOf(refillRate),
            String.valueOf(now),
            "1"  // 请求1个令牌
        );

        return result != null && result == 1L;
    }
}

来源:
http://oieaw.cn/

相关文章
|
8天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
2763 15
|
6天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
2304 4
|
21天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23554 13
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
8天前
|
人工智能 JSON BI
DeepSeek V4-Pro 接入 Claude Code 完全实战:体验、测试与关键避坑指南
Claude Code 作为当前主流的 AI 编程辅助工具,凭借强大的代码理解、工程执行与自动化能力深受开发者喜爱,但原生模型的使用成本相对较高。为了在保持能力的同时进一步降低开销,不少开发者开始寻找兼容度高、价格更友好的替代模型。DeepSeek V4 系列的发布带来了新的选择,该系列包含 V4-Pro 与 V4-Flash 两款模型,并提供了与 Anthropic 完全兼容的 API 接口,理论上只需简单修改配置,即可让 Claude Code 无缝切换为 DeepSeek 引擎。
2055 1
|
2天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
1306 1
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
|
14天前
|
人工智能 缓存 Shell
Claude Code 全攻略:命令大全 + 实战工作流(完整版)
Claude Code 是一款运行在终端环境下的 AI 编码助手,能够直接在项目目录中理解代码结构、编辑文件、执行命令、执行开发计划,并支持持久化记忆、上下文压缩、后台任务、多模型切换等专业能力。对于日常开发、项目维护、快速重构、代码审查等场景,它可以大幅减少手动操作、提升编码效率。本文从常用命令、界面模式、核心指令、记忆机制、图片处理、进阶工作流等维度完整说明,帮助开发者快速上手并稳定使用。
3457 5
|
7天前
|
人工智能 安全 开发工具
Claude Code 官方工作原理与使用指南
Claude Code 不是传统代码补全工具,而是 Anthropic 推出的终端 AI 代理,具备代理循环、双驱动架构(模型+工具)、全局项目感知、6 种权限模式等核心能力,本文基于官方文档系统解析其工作原理与高效使用技巧。
1095 0