分布式架构的核心本质,是将单体系统拆分为多个跨网络的服务节点,而网络通信的不可靠性,是所有分布式故障的根源。无论是网络抖动、服务GC停顿、节点宕机,还是流量突增导致的响应延迟,最终都会体现在服务调用的异常上。超时、重试、幂等,正是解决分布式调用可用性与一致性问题的三大核心基石,三者并非孤立的技术点,而是一套环环相扣的架构级规范体系。本文将从底层逻辑出发,拆解三大能力的架构设计标准、落地实现方案与全场景避坑指南,帮助开发者从根源上解决分布式调用的稳定性问题。
一、超时机制:分布式调用的第一道防线
1.1 超时的底层逻辑与核心价值
在分布式调用中,没有超时控制的请求,就是一颗随时会爆炸的炸弹。当发起一次跨服务调用后,如果下游服务长期不返回响应,调用方的线程会一直阻塞等待,而服务的线程池资源是有限的,一旦大量线程被阻塞,会直接导致服务吞吐量下降,甚至引发线程池耗尽、服务雪崩,最终导致整个链路不可用。 超时机制的核心价值,是给每一次分布式调用设定一个明确的“生死线”,一旦超过约定时间未收到响应,就主动中断请求、释放资源,避免无效的资源占用,防止故障的向上传导。
1.2 超时的分层架构与设计规范
分布式调用的超时设计,绝非在接口上设置一个固定的超时时间即可,而是需要一套全链路的分层超时体系,每一层的超时设计都有明确的边界与规则。
| 分层 | 超时类型 | 核心作用 | 设计规范 |
| 业务层 | 全链路超时预算 | 定义整个业务请求的最大可接受耗时 | 必须基于业务SLA设定,是全链路所有调用的超时上限 |
| 接口层 | 服务调用超时 | 单次RPC/HTTP服务调用的最大等待时间 | 必须小于全链路超时预算,且大于下游接口的P99响应时间 |
| 传输层 | 连接超时/读取超时 | TCP连接建立、数据传输的超时控制 | 连接超时建议设置为500-1000ms,读取超时需与接口层超时匹配 |
| 资源层 | 连接池超时/数据库超时 | 连接池获取连接、数据库执行的超时控制 | 必须小于接口层超时,避免资源获取耗时占用接口超时预算 |
核心设计规范:
- 超时预算递减原则:全链路超时预算必须逐层向下传递并扣减,避免下游执行时间超过上游的超时上限。例如全链路SLA为1000ms,服务A调用服务B的超时设置为800ms,服务B调用服务C的超时设置为600ms,确保上游超时触发时,下游的所有调用都能被及时中断。
- 超时时间适配原则:单次调用的超时时间,必须大于下游服务的P99响应时间,同时预留30%以上的冗余量,避免因正常的网络波动、GC停顿导致的频繁超时。严禁将超时时间设置为小于下游接口的平均响应时间,否则会出现大量正常请求被超时中断的情况。
- 中断式超时原则:超时触发时,必须主动中断下游的执行线程,而非仅放弃等待响应。仅放弃等待会导致下游线程继续执行,造成数据不一致与资源浪费,Java中可通过CompletableFuture的cancel方法主动中断执行线程。
- 超时异常区分原则:必须明确区分连接超时、读取超时、业务超时三种异常类型,为后续的重试机制提供判断依据。连接超时、读取超时属于可重试异常,业务超时需根据业务场景判断是否可重试。
1.3 超时机制的落地实现
1.3.1 全链路超时预算的透传实现
基于MDC实现全链路traceId与超时预算的透传,确保每一次调用都能获取到剩余的超时时间。
package com.jam.demo.common.context;
import org.springframework.util.StringUtils;
import com.alibaba.ttl.TransmittableThreadLocal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 全链路上下文工具类
* @author ken
*/
public class TraceContextHolder {
private static final TransmittableThreadLocal<Map<String, Object>> CONTEXT_HOLDER = new TransmittableThreadLocal<>() {
@Override
protected Map<String, Object> initialValue() {
return new ConcurrentHashMap<>();
}
};
private static final String TRACE_ID_KEY = "traceId";
private static final String REMAINING_TIMEOUT_KEY = "remainingTimeout";
private static final long DEFAULT_TIMEOUT_MS = 1000L;
private TraceContextHolder() {}
/**
* 设置链路追踪ID
* @param traceId 链路ID
*/
public static void setTraceId(String traceId) {
if (StringUtils.hasText(traceId)) {
CONTEXT_HOLDER.get().put(TRACE_ID_KEY, traceId);
}
}
/**
* 获取链路追踪ID
* @return 链路ID
*/
public static String getTraceId() {
Object traceId = CONTEXT_HOLDER.get().get(TRACE_ID_KEY);
return traceId == null ? "" : traceId.toString();
}
/**
* 设置剩余超时预算
* @param timeoutMs 超时时间,单位毫秒
*/
public static void setRemainingTimeout(long timeoutMs) {
CONTEXT_HOLDER.get().put(REMAINING_TIMEOUT_KEY, Math.max(timeoutMs, 0));
}
/**
* 获取剩余超时预算
* @return 剩余超时时间,单位毫秒
*/
public static long getRemainingTimeout() {
Object timeout = CONTEXT_HOLDER.get().get(REMAINING_TIMEOUT_KEY);
if (timeout == null) {
return DEFAULT_TIMEOUT_MS;
}
return Math.max((long) timeout, 0);
}
/**
* 扣减超时预算
* @param consumeMs 消耗的时间,单位毫秒
*/
public static void deductTimeout(long consumeMs) {
long current = getRemainingTimeout();
setRemainingTimeout(current - consumeMs);
}
/**
* 清除上下文
*/
public static void clear() {
CONTEXT_HOLDER.remove();
}
}
1.3.2 基于AOP的接口超时控制实现
自定义超时注解,通过AOP统一实现接口的超时控制,基于JDK17的CompletableFuture实现异步超时中断。
package com.jam.demo.common.annotation;
import java.lang.annotation.*;
/**
* 接口超时控制注解
* @author ken
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Timeout {
/**
* 超时时间,单位毫秒
*/
long value() default 1000L;
/**
* 是否使用全链路超时预算
*/
boolean useTraceTimeout() default true;
}
package com.jam.demo.common.aspect;
import com.jam.demo.common.annotation.Timeout;
import com.jam.demo.common.context.TraceContextHolder;
import com.jam.demo.common.exception.BizException;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
/**
* 超时控制切面
* @author ken
*/
@Slf4j
@Aspect
@Component
public class TimeoutAspect {
private static final ThreadPoolExecutor TIMEOUT_EXECUTOR = new ThreadPoolExecutor(
10,
50,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> {
Thread thread = new Thread(r, "timeout-handler-thread");
thread.setDaemon(true);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
@Around("@annotation(timeout)")
public Object around(ProceedingJoinPoint joinPoint, Timeout timeout) throws Throwable {
long timeoutMs = timeout.useTraceTimeout() ? TraceContextHolder.getRemainingTimeout() : timeout.value();
if (timeoutMs <= 0) {
log.error("链路超时预算耗尽,traceId:{}", TraceContextHolder.getTraceId());
throw new BizException("请求处理超时");
}
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
try {
return joinPoint.proceed();
} catch (Throwable e) {
throw new CompletionException(e);
}
}, TIMEOUT_EXECUTOR);
try {
long start = System.currentTimeMillis();
Object result = future.get(timeoutMs, TimeUnit.MILLISECONDS);
long consume = System.currentTimeMillis() - start;
TraceContextHolder.deductTimeout(consume);
return result;
} catch (TimeoutException e) {
future.cancel(true);
log.error("接口执行超时,方法:{}, 超时时间:{}ms, traceId:{}",
signature.getMethod().getName(), timeoutMs, TraceContextHolder.getTraceId(), e);
throw new BizException("请求处理超时");
} catch (ExecutionException e) {
throw e.getCause();
} finally {
future.cancel(true);
}
}
}
1.3.3 HTTP客户端超时配置
基于Spring Boot 3.x的RestTemplate配置,实现分层的超时控制,确保所有HTTP调用都有明确的超时设置。
package com.jam.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
/**
* RestTemplate配置类
* @author ken
*/
@Configuration
public class RestTemplateConfig {
private static final int CONNECT_TIMEOUT = 1000;
private static final int READ_TIMEOUT = 800;
@Bean
public RestTemplate restTemplate(ClientHttpRequestFactory factory) {
return new RestTemplate(factory);
}
@Bean
public ClientHttpRequestFactory clientHttpRequestFactory() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setConnectTimeout(CONNECT_TIMEOUT);
factory.setReadTimeout(READ_TIMEOUT);
return factory;
}
}
1.4 超时机制的高频避坑点
- 超时时间设置不合理:超时时间小于下游P99响应时间,导致频繁超时;或超时时间设置过长,无法起到故障隔离的作用。
- 非中断式超时:仅捕获超时异常,未中断下游执行线程,导致下游业务继续执行,引发数据不一致。
- 全链路超时未传递:每层调用都设置相同的超时时间,导致上游已经超时返回,下游仍在执行,出现“幽灵请求”。
- 资源层超时大于接口层超时:数据库查询超时、连接池获取连接的超时时间大于接口超时时间,导致接口已经超时,数据库查询仍在执行,占用数据库连接资源。
- 忽略重试场景的超时叠加:未考虑重试带来的总耗时增长,导致多次重试后的总耗时超过全链路超时预算。
二、重试机制:分布式调用的容错核心
2.1 重试的底层逻辑与适用边界
重试机制的核心价值,是应对分布式系统中的临时故障,通过自动重试的方式,提升请求的成功率,避免人工介入。临时故障指的是短时间内可恢复的故障,例如网络抖动、服务节点重启、GC停顿、数据库锁冲突等,这类故障的特点是单次调用失败,短时间内再次调用大概率会成功。 但重试是一把双刃剑,无节制的重试会放大故障,引发重试风暴,最终导致整个集群雪崩。因此,重试机制的设计核心,是明确重试的适用边界,制定严格的重试规范,在提升可用性的同时,避免故障放大。
2.2 重试的核心架构规范
- 仅可重试幂等接口:所有可重试的接口,必须保证幂等性,非幂等的写接口严禁开启自动重试,否则会导致数据重复写入、业务逻辑重复执行。
- 仅可重试可恢复异常:必须严格区分可重试异常与不可重试异常,仅对临时故障导致的异常进行重试。
- 可重试异常:网络超时、连接异常、服务5xx响应、数据库锁等待超时、熔断触发的降级异常等。
- 不可重试异常:参数校验异常、权限不足、业务逻辑异常、数据不存在、4xx响应等,这类异常无论重试多少次,都会失败。
- 严格限制重试次数与总耗时:重试次数必须设置上限,建议最大重试次数不超过3次;重试的总耗时(单次调用超时*重试次数+退避时间)必须小于全链路剩余超时预算,避免重试导致全链路超时。
- 必须使用退避策略:严禁使用无间隔的循环重试,必须使用退避策略,降低重试请求的频率,给下游服务恢复的时间。常用的退避策略包括固定退避、指数退避、随机退避,高并发场景推荐使用指数退避+随机抖动,避免重试流量的集中冲击。
- 重试节点隔离:RPC调用的重试,必须在不同的服务节点上进行,严禁在同一个故障节点上重复重试,避免加重故障节点的负担。
- 熔断兜底:必须配合熔断机制,当下游服务的失败率达到阈值时,触发熔断,停止重试,直接返回降级结果,避免重试风暴。
2.3 重试机制的落地实现
2.3.1 自定义重试注解与AOP实现
基于Spring AOP实现通用的重试切面,支持自定义重试次数、退避策略、可重试异常类型,与全链路超时预算协同。
package com.jam.demo.common.annotation;
import java.lang.annotation.*;
/**
* 重试控制注解
* @author ken
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Retry {
/**
* 最大重试次数
*/
int maxAttempts() default 3;
/**
* 初始退避时间,单位毫秒
*/
long backoffMs() default 100L;
/**
* 退避倍数,指数退避使用
*/
double multiplier() default 2.0;
/**
* 最大退避时间,单位毫秒
*/
long maxBackoffMs() default 1000L;
/**
* 可重试的异常类型
*/
Class<? extends Throwable>[] retryFor() default {Exception.class};
/**
* 不可重试的异常类型
*/
Class<? extends Throwable>[] noRetryFor() default {};
}
package com.jam.demo.common.aspect;
import com.jam.demo.common.annotation.Retry;
import com.jam.demo.common.context.TraceContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.Arrays;
import java.util.Random;
/**
* 重试控制切面
* @author ken
*/
@Slf4j
@Aspect
@Component
public class RetryAspect {
private static final Random RANDOM = new Random();
@Around("@annotation(retry)")
public Object around(ProceedingJoinPoint joinPoint, Retry retry) throws Throwable {
int maxAttempts = retry.maxAttempts();
long backoffMs = retry.backoffMs();
double multiplier = retry.multiplier();
long maxBackoffMs = retry.maxBackoffMs();
Class<? extends Throwable>[] retryFor = retry.retryFor();
Class<? extends Throwable>[] noRetryFor = retry.noRetryFor();
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
String methodName = signature.getDeclaringTypeName() + "." + signature.getMethod().getName();
int attemptCount = 0;
Throwable lastException = null;
while (attemptCount < maxAttempts) {
attemptCount++;
try {
return joinPoint.proceed();
} catch (Throwable e) {
lastException = e;
if (!isRetryable(e, retryFor, noRetryFor)) {
log.error("不可重试异常,终止重试,方法:{}, 尝试次数:{}, traceId:{}",
methodName, attemptCount, TraceContextHolder.getTraceId(), e);
throw e;
}
if (attemptCount >= maxAttempts) {
log.error("重试次数耗尽,方法:{}, 最大重试次数:{}, traceId:{}",
methodName, maxAttempts, TraceContextHolder.getTraceId(), e);
break;
}
long sleepTime = calculateBackoffTime(backoffMs, multiplier, attemptCount, maxBackoffMs);
long remainingTimeout = TraceContextHolder.getRemainingTimeout();
if (sleepTime >= remainingTimeout) {
log.error("重试退避时间超过剩余超时预算,终止重试,方法:{}, 退避时间:{}, 剩余超时:{}, traceId:{}",
methodName, sleepTime, remainingTimeout, TraceContextHolder.getTraceId());
break;
}
log.warn("方法执行失败,触发重试,方法:{}, 尝试次数:{}, 退避时间:{}ms, traceId:{}",
methodName, attemptCount, sleepTime, TraceContextHolder.getTraceId(), e);
Thread.sleep(sleepTime);
TraceContextHolder.deductTimeout(sleepTime);
}
}
throw lastException;
}
/**
* 判断异常是否可重试
* @param e 异常对象
* @param retryFor 可重试异常列表
* @param noRetryFor 不可重试异常列表
* @return 是否可重试
*/
private boolean isRetryable(Throwable e, Class<? extends Throwable>[] retryFor, Class<? extends Throwable>[] noRetryFor) {
if (!ObjectUtils.isEmpty(noRetryFor)) {
for (Class<? extends Throwable> clazz : noRetryFor) {
if (clazz.isInstance(e)) {
return false;
}
}
}
if (ObjectUtils.isEmpty(retryFor)) {
return true;
}
for (Class<? extends Throwable> clazz : retryFor) {
if (clazz.isInstance(e)) {
return true;
}
}
return false;
}
/**
* 计算退避时间,指数退避+随机抖动
* @param backoffMs 初始退避时间
* @param multiplier 退避倍数
* @param attemptCount 当前重试次数
* @param maxBackoffMs 最大退避时间
* @return 最终退避时间
*/
private long calculateBackoffTime(long backoffMs, double multiplier, int attemptCount, long maxBackoffMs) {
long baseTime = (long) (backoffMs * Math.pow(multiplier, attemptCount - 1));
long boundedTime = Math.min(baseTime, maxBackoffMs);
long jitter = (long) (boundedTime * 0.1 * RANDOM.nextDouble());
return boundedTime + jitter;
}
}
2.3.2 可重试异常体系定义
package com.jam.demo.common.exception;
/**
* 基础业务异常
* @author ken
*/
public class BizException extends RuntimeException {
private final String code;
public BizException(String message) {
super(message);
this.code = "500";
}
public BizException(String code, String message) {
super(message);
this.code = code;
}
public String getCode() {
return code;
}
}
package com.jam.demo.common.exception;
/**
* 可重试异常
* @author ken
*/
public class RetryableException extends BizException {
public RetryableException(String message) {
super("503", message);
}
public RetryableException(String code, String message) {
super(code, message);
}
}
package com.jam.demo.common.exception;
/**
* 不可重试异常
* @author ken
*/
public class NonRetryableException extends BizException {
public NonRetryableException(String message) {
super("400", message);
}
public NonRetryableException(String code, String message) {
super(code, message);
}
}
2.4 重试机制的高频避坑点
- 非幂等接口开启重试:这是最严重的生产事故隐患,非幂等的写接口重试,会导致订单重复创建、支付重复执行、数据重复插入等严重业务问题。
- 重试风暴:多个上游服务同时对故障下游服务进行重试,重试流量放大数倍,导致下游服务彻底被打挂,故障范围扩大。例如3个上游服务,每个设置3次重试,故障时会产生9倍的流量冲击。
- 无间隔循环重试:未使用退避策略,失败后立即重试,导致下游服务没有恢复时间,故障持续恶化。
- 同一个节点重复重试:RPC调用时,在同一个故障节点上多次重试,加重故障节点的负载,导致节点彻底宕机。
- 重试异常类型错误:对不可重试的业务异常进行重试,例如参数错误、余额不足,不仅无法解决问题,还会浪费系统资源。
- 重试总耗时超过全链路超时:未考虑重试的总耗时,导致多次重试后,上游服务已经超时返回,重试请求仍在执行,引发数据不一致。
三、幂等设计:分布式调用的最终兜底
3.1 幂等的底层逻辑与核心价值
幂等性的数学定义是:f(f(x)) = f(x),即任意多次执行所产生的影响,均与一次执行的影响完全相同。在分布式系统中,幂等性是保证数据一致性的核心基础,无论是重试机制、消息重复投递、用户重复提交,还是网络超时导致的重复请求,最终都需要依靠幂等设计来保证业务数据的正确性。 没有幂等性的保障,重试机制就是灾难,分布式系统的一致性就无从谈起。架构级规范要求:所有的写接口,必须设计幂等性;所有可重试的接口,必须保证幂等性。
3.2 幂等设计的核心架构规范
- 全局唯一幂等键原则:幂等设计的核心是选择全局唯一的幂等键,幂等键必须能够唯一标识一次业务请求,且在整个分布式链路中保持不变。常用的幂等键包括:订单号、流水号、请求唯一ID、用户ID+业务唯一标识组合。
- 先校验后执行原则:所有的幂等校验,必须在业务逻辑执行之前完成,避免业务逻辑重复执行。严禁使用“先执行业务,再判断是否重复”的反向逻辑,否则会导致业务逻辑重复执行,引发数据不一致。
- 并发安全原则:幂等设计必须考虑高并发场景,避免并发请求导致的幂等校验失效。“先查询后插入”的非原子操作,在高并发场景下会出现重复请求穿透的问题,必须使用原子操作保证并发安全。
- 业务解耦原则:幂等校验逻辑必须与业务逻辑解耦,通过架构级的统一组件实现,避免在每个业务方法中重复编写幂等代码,提升代码的可维护性。
- 结果复用原则:重复请求的处理,必须直接返回第一次执行的成功结果,而非返回异常,保证调用方的体验一致性。
- 过期清理原则:幂等数据必须设置过期时间,避免存储无限膨胀,影响系统性能。幂等数据的过期时间,必须大于业务请求的最大重试时间窗口。
3.3 幂等设计的架构级落地实现
3.3.1 数据库表结构设计
基于MySQL8.0设计通用的幂等去重表,支持全业务场景的幂等校验。
CREATE TABLE `idempotent_record` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`idempotent_key` varchar(128) NOT NULL COMMENT '幂等键,全局唯一',
`business_type` varchar(64) NOT NULL COMMENT '业务类型',
`request_data` text COMMENT '请求数据',
`response_data` text COMMENT '响应结果',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`expire_time` datetime NOT NULL COMMENT '过期时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_idempotent_key` (`idempotent_key`),
KEY `idx_expire_time` (`expire_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='幂等去重表';
3.3.2 基于唯一约束的通用幂等组件实现
基于MyBatisPlus实现通用的幂等服务,采用MySQL唯一约束保证原子性,配合编程式事务实现幂等校验与业务执行的事务一致性。
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 幂等记录实体
* @author ken
*/
@Data
@TableName("idempotent_record")
@Schema(description = "幂等记录实体")
public class IdempotentRecord {
@TableId(type = IdType.AUTO)
@Schema(description = "主键ID")
private Long id;
@Schema(description = "幂等键")
private String idempotentKey;
@Schema(description = "业务类型")
private String businessType;
@Schema(description = "请求数据")
private String requestData;
@Schema(description = "响应数据")
private String responseData;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "过期时间")
private LocalDateTime expireTime;
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.IdempotentRecord;
import org.apache.ibatis.annotations.Mapper;
/**
* 幂等记录Mapper
* @author ken
*/
@Mapper
public interface IdempotentRecordMapper extends BaseMapper<IdempotentRecord> {
}
package com.jam.demo.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.jam.demo.entity.IdempotentRecord;
import java.util.function.Supplier;
/**
* 幂等服务接口
* @author ken
*/
public interface IdempotentService extends IService<IdempotentRecord> {
/**
* 幂等执行方法
* @param idempotentKey 幂等键
* @param businessType 业务类型
* @param supplier 业务执行逻辑
* @param expireDays 过期天数
* @return 业务执行结果
*/
<T> T execute(String idempotentKey, String businessType, Supplier<T> supplier, int expireDays);
}
package com.jam.demo.service.impl;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.jam.demo.entity.IdempotentRecord;
import com.jam.demo.mapper.IdempotentRecordMapper;
import com.jam.demo.service.IdempotentService;
import com.jam.demo.common.exception.BizException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
import java.util.function.Supplier;
/**
* 幂等服务实现类
* @author ken
*/
@Slf4j
@Service
public class IdempotentServiceImpl extends ServiceImpl<IdempotentRecordMapper, IdempotentRecord> implements IdempotentService {
private final PlatformTransactionManager transactionManager;
public IdempotentServiceImpl(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
@Override
public <T> T execute(String idempotentKey, String businessType, Supplier<T> supplier, int expireDays) {
if (!StringUtils.hasText(idempotentKey)) {
throw new BizException("幂等键不能为空");
}
if (!StringUtils.hasText(businessType)) {
throw new BizException("业务类型不能为空");
}
if (expireDays <= 0) {
expireDays = 7;
}
IdempotentRecord existRecord = getByIdempotentKey(idempotentKey);
if (!ObjectUtils.isEmpty(existRecord)) {
log.info("重复请求,直接返回历史结果,幂等键:{}, 业务类型:{}", idempotentKey, businessType);
return JSON.parseObject(existRecord.getResponseData(), Object.class);
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
TransactionStatus status = transactionManager.getTransaction(def);
try {
IdempotentRecord record = new IdempotentRecord();
record.setIdempotentKey(idempotentKey);
record.setBusinessType(businessType);
record.setCreateTime(LocalDateTime.now());
record.setExpireTime(LocalDateTime.now().plusDays(expireDays));
boolean saveSuccess = save(record);
if (!saveSuccess) {
transactionManager.rollback(status);
existRecord = getByIdempotentKey(idempotentKey);
if (!ObjectUtils.isEmpty(existRecord)) {
log.info("并发请求,幂等校验生效,幂等键:{}, 业务类型:{}", idempotentKey, businessType);
return JSON.parseObject(existRecord.getResponseData(), Object.class);
}
throw new BizException("幂等记录保存失败");
}
T result = supplier.get();
record.setResponseData(JSON.toJSONString(result));
updateById(record);
transactionManager.commit(status);
return result;
} catch (Exception e) {
transactionManager.rollback(status);
log.error("幂等执行异常,幂等键:{}, 业务类型:{}", idempotentKey, businessType, e);
throw e;
}
}
/**
* 根据幂等键查询记录
* @param idempotentKey 幂等键
* @return 幂等记录
*/
private IdempotentRecord getByIdempotentKey(String idempotentKey) {
LambdaQueryWrapper<IdempotentRecord> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(IdempotentRecord::getIdempotentKey, idempotentKey);
return getOne(wrapper);
}
}
3.3.3 基于AOP的注解式幂等实现
自定义幂等注解,通过AOP实现业务代码与幂等逻辑的完全解耦,架构级统一处理。
package com.jam.demo.common.annotation;
import java.lang.annotation.*;
/**
* 幂等控制注解
* @author ken
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
/**
* 幂等键SpEL表达式,用于从请求参数中提取幂等键
*/
String key();
/**
* 业务类型
*/
String businessType();
/**
* 幂等记录过期天数
*/
int expireDays() default 7;
}
package com.jam.demo.common.aspect;
import com.jam.demo.common.annotation.Idempotent;
import com.jam.demo.common.context.TraceContextHolder;
import com.jam.demo.service.IdempotentService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
/**
* 幂等控制切面
* @author ken
*/
@Slf4j
@Aspect
@Component
public class IdempotentAspect {
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
private static final LocalVariableTableParameterNameDiscoverer DISCOVERER = new LocalVariableTableParameterNameDiscoverer();
private final IdempotentService idempotentService;
public IdempotentAspect(IdempotentService idempotentService) {
this.idempotentService = idempotentService;
}
@Around("@annotation(idempotent)")
public Object around(ProceedingJoinPoint joinPoint, Idempotent idempotent) throws Throwable {
String keySpel = idempotent.key();
String businessType = idempotent.businessType();
int expireDays = idempotent.expireDays();
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Object[] args = joinPoint.getArgs();
String[] paramNames = DISCOVERER.getParameterNames(method);
String idempotentKey;
if (StringUtils.hasText(keySpel)) {
EvaluationContext context = new StandardEvaluationContext();
if (paramNames != null && paramNames.length > 0) {
for (int i = 0; i < paramNames.length; i++) {
context.setVariable(paramNames[i], args[i]);
}
}
Expression expression = PARSER.parseExpression(keySpel);
Object keyValue = expression.getValue(context);
if (keyValue == null) {
log.error("幂等键提取失败,SpEL表达式:{}, traceId:{}", keySpel, TraceContextHolder.getTraceId());
throw new IllegalArgumentException("幂等键不能为空");
}
idempotentKey = keyValue.toString();
} else {
idempotentKey = TraceContextHolder.getTraceId();
}
return idempotentService.execute(idempotentKey, businessType, () -> {
try {
return joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}, expireDays);
}
}
3.3.4 其他幂等方案的适用场景与实现
- 基于状态机的幂等:适用于有明确状态流转的业务场景,通过更新时的状态条件判断实现幂等,仅当状态符合预期时才会执行更新,多次执行仅生效一次。
UPDATE `order` SET `status` = 2, `pay_time` = NOW() WHERE `order_no` = #{orderNo} AND `status` = 1;
- 基于乐观锁的幂等:适用于更新操作,通过版本号机制实现,避免并发更新导致的数据不一致,仅当版本号匹配时才会执行更新。
UPDATE `product` SET `stock` = `stock` - 1, `version` = `version` + 1 WHERE `id` = #{id} AND `version` = #{oldVersion} AND `stock` > 0;
- 基于分布式锁的幂等:适用于高并发场景,通过分布式锁保证同一时间只有一个请求执行,配合幂等记录实现结果复用,避免并发请求穿透。
3.4 幂等设计的高频避坑点
- 幂等键选择错误:幂等键不具备全局唯一性,例如仅使用用户ID作为幂等键,导致用户的不同请求被判定为重复请求;或使用时间戳作为幂等键,导致并发请求的幂等键重复。
- 并发场景下的幂等失效:采用“先查询后插入”的非原子操作,高并发场景下,多个请求同时查询到无记录,同时执行业务逻辑,导致重复数据插入。
- 幂等逻辑与业务逻辑强耦合:在每个业务方法中重复编写幂等代码,代码冗余度高,可维护性差,容易出现遗漏和错误。
- 幂等数据无过期机制:幂等记录无限期存储,导致表数据量持续增长,查询性能下降,最终影响系统稳定性。
- 重复请求返回异常:重复请求时直接抛出“重复提交”异常,而非返回第一次执行的成功结果,导致调用方处理逻辑复杂,体验不一致。
- 忽略事务边界:幂等校验与业务逻辑不在同一个事务中,导致幂等校验通过后,事务提交失败,重复请求无法被拦截。
四、三大能力的架构级协同体系
超时、重试、幂等三者并非孤立的技术点,而是一套环环相扣的架构级体系,三者的协同设计,是分布式调用稳定性的核心。
4.1 核心协同规范
- 超时预算是总纲:全链路超时预算是所有设计的总纲,重试的总耗时、单次调用的超时时间,都必须在剩余超时预算范围内,避免超时预算耗尽导致的请求失败。
- 幂等是重试的前提:所有开启重试的接口,必须先保证幂等性,没有幂等保障的重试,绝对禁止开启。
- 重试是超时的补充:超时触发后,只有符合可重试条件的请求,才会触发重试,重试是为了解决超时导致的临时故障,而非替代超时控制。
- 全链路上下文透传:traceId、超时预算、幂等键必须在全链路中透传,确保每一层的调用都能获取到完整的上下文信息,实现协同控制。
4.2 协同落地的业务示例
以订单创建接口为例,完整展示三大能力的协同使用。
package com.jam.demo.controller;
import com.jam.demo.common.annotation.Idempotent;
import com.jam.demo.common.annotation.Retry;
import com.jam.demo.common.annotation.Timeout;
import com.jam.demo.common.context.TraceContextHolder;
import com.jam.demo.common.result.Result;
import com.jam.demo.dto.OrderCreateDTO;
import com.jam.demo.service.OrderService;
import com.jam.demo.common.exception.RetryableException;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
/**
* 订单控制器
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/order")
@Tag(name = "订单管理", description = "订单相关接口")
public class OrderController {
private final OrderService orderService;
public OrderController(OrderService orderService) {
this.orderService = orderService;
}
@PostMapping("/create")
@Operation(summary = "创建订单", description = "创建订单接口,支持超时、重试、幂等控制")
@Timeout(value = 1000L, useTraceTimeout = true)
@Idempotent(key = "#orderCreateDTO.orderNo", businessType = "ORDER_CREATE", expireDays = 30)
@Retry(maxAttempts = 3, backoffMs = 100L, multiplier = 2.0, retryFor = {RetryableException.class})
public Result<String> createOrder(@RequestBody OrderCreateDTO orderCreateDTO) {
log.info("开始创建订单,订单号:{}, traceId:{}", orderCreateDTO.getOrderNo(), TraceContextHolder.getTraceId());
String orderId = orderService.createOrder(orderCreateDTO);
return Result.success(orderId);
}
}
package com.jam.demo.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
/**
* 订单创建DTO
* @author ken
*/
@Data
@Schema(description = "订单创建请求参数")
public class OrderCreateDTO {
@Schema(description = "订单号,全局唯一", requiredMode = Schema.RequiredMode.REQUIRED)
private String orderNo;
@Schema(description = "用户ID", requiredMode = Schema.RequiredMode.REQUIRED)
private Long userId;
@Schema(description = "商品ID", requiredMode = Schema.RequiredMode.REQUIRED)
private Long productId;
@Schema(description = "购买数量", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer quantity;
@Schema(description = "订单金额", requiredMode = Schema.RequiredMode.REQUIRED)
private BigDecimal amount;
}
五、生产级全链路最佳实践
5.1 全链路可观测性建设
基于traceId实现全链路的日志追踪,对超时率、重试次数、幂等拦截次数、调用成功率等核心指标进行监控,设置对应的告警阈值,及时发现线上问题。核心监控指标包括:接口超时率、重试触发次数、幂等拦截次数、调用成功率、平均响应时间、P99响应时间。
5.2 统一的异常体系与降级策略
明确区分可重试异常与不可重试异常,建立统一的异常处理体系,针对不同的异常类型,制定对应的降级策略。当服务出现故障时,优先返回兜底数据,而非抛出异常,保证系统的可用性。
5.3 熔断机制的配合
配合熔断组件实现熔断机制,当下游服务的失败率达到阈值时,触发熔断,停止重试和调用,直接返回降级结果,避免重试风暴和故障扩大。熔断恢复后,再逐步恢复调用和重试。
5.4 压测与灰度验证
上线前必须针对超时、重试、幂等场景进行全量压测,验证高并发场景下的系统稳定性和幂等有效性。上线时采用灰度发布,先在小流量范围内验证策略的有效性,再逐步全量发布,避免线上故障。
5.5 规范落地与团队共识
将超时、重试、幂等的设计规范,纳入团队的开发规范中,通过代码评审、静态代码扫描等方式,确保规范的落地执行。同时,通过培训和案例分享,让团队所有成员都理解三大能力的底层逻辑和设计规范,从根源上避免分布式调用的稳定性问题。
分布式系统的稳定性,从来都不是靠单点的技术优化,而是靠一套完整的架构级规范体系。超时控制划定了请求的生死线,重试机制提升了临时故障的容错能力,幂等设计保证了数据一致性的最终兜底。只有将三者深度协同,从架构设计阶段就纳入规范,才能从根源上解决分布式调用的稳定性问题,构建出高可用、高可靠的分布式系统。
项目核心依赖配置
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.6</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.36</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.48</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.1.0-jre</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.27.0</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.14.2</version>
</dependency>
</dependencies>