下载地址:http://lanzou.com.cn/i3cfb236a

项目结构
Project Structure
Folder : jisuanxitong
Files : 51
Size : 157.2 KB
Generated: 2026-03-20 18:06:45
jisuanxitong/
├── README.md [178 B]
├── aspect/
│ ├── Adapter.js [2.8 KB]
│ ├── Cache.go [3 KB]
│ ├── Handler.sql [2.3 KB]
│ ├── Loader.java [5.7 KB]
│ ├── Proxy.js [3.1 KB]
│ └── Queue.js [2.8 KB]
├── config/
│ ├── Client.properties [584 B]
│ ├── Controller.xml [1.2 KB]
│ ├── Parser.xml [1.3 KB]
│ ├── Transformer.json [678 B]
│ ├── Transformer.properties [584 B]
│ ├── Validator.json [678 B]
│ └── application.properties [584 B]
├── connectors/
│ ├── Scheduler.ts [2.3 KB]
│ ├── Server.py [3.9 KB]
│ └── Util.php [4 KB]
├── gateway/
│ ├── Builder.cpp [1.4 KB]
│ └── Converter.php [3.2 KB]
├── hooks/
│ ├── Buffer.go [3 KB]
│ ├── Dispatcher.py [6 KB]
│ └── Observer.cpp [1.6 KB]
├── lib/
│ ├── Executor.jar [629 B]
│ └── Repository.jar [629 B]
├── migrations/
│ ├── Controller.php [4.1 KB]
│ ├── Helper.go [4.1 KB]
│ ├── Listener.cpp [1.5 KB]
│ ├── Listener.sql [3.8 KB]
│ └── Service.py [5 KB]
├── package.json [678 B]
├── pom.xml [1.3 KB]
├── response/
│ └── Pool.py [6.2 KB]
├── routes/
│ ├── Builder.go [4.1 KB]
│ ├── Engine.py [4 KB]
│ ├── Manager.js [3.3 KB]
│ ├── Manager.ts [3.1 KB]
│ ├── Provider.js [3.9 KB]
│ ├── Proxy.cpp [1.6 KB]
│ ├── Resolver.ts [3.4 KB]
│ ├── Service.js [3.6 KB]
│ └── Wrapper.ts [2.7 KB]
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── Dispatcher.java [5.4 KB]
│ │ │ ├── Executor.java [4.7 KB]
│ │ │ ├── Pool.java [4.4 KB]
│ │ │ ├── Processor.java [6.4 KB]
│ │ │ ├── Registry.java [7.1 KB]
│ │ │ ├── Resolver.java [7.3 KB]
│ │ │ └── Worker.java [4.2 KB]
│ │ └── resources/
│ └── test/
│ └── java/
└── stubs/
├── Cache.php [3.5 KB]
└── Factory.py [6 KB]
数据一致性:并发扣款不能超扣,必须保证最终一致性
高并发:双11峰值每秒数万笔余额变更
高可用:99.99%的可用性要求
可追溯:每一笔变动都需要完整流水
二、核心领域模型
首先定义清晰的领域模型:
java
/**
账户聚合根
*/
@Getter
public class Account {
private Long accountId;
private Long userId;
private Money balance; // 当前余额
private Money frozenAmount; // 冻结金额
private Long version; // 乐观锁版本号
private Date updateTime;/**
扣款(含防超扣逻辑)
*/
public void debit(Money amount, String bizNo) {
// 校验可扣金额 = 余额 - 冻结金额
Money available = balance.subtract(frozenAmount);
if (available.compareTo(amount) < 0) {throw new InsufficientBalanceException( "余额不足,可用余额:" + available + ",需扣款:" + amount );}
this.balance = balance.subtract(amount);
this.version++;
this.updateTime = new Date();
}/**
加款
*/
public void credit(Money amount, String bizNo) {
this.balance = balance.add(amount);
this.version++;
this.updateTime = new Date();
}/**
冻结
*/
public void freeze(Money amount, String bizNo) {
if (balance.subtract(frozenAmount).compareTo(amount) < 0) {throw new InsufficientBalanceException("可冻结余额不足");}
this.frozenAmount = frozenAmount.add(amount);
this.version++;
}/**
- 解冻
*/
public void unfreeze(Money amount, String bizNo) {
if (frozenAmount.compareTo(amount) < 0) {
}throw new FrozenAmountMismatchException("解冻金额超过冻结金额");
this.frozenAmount = frozenAmount.subtract(amount);
this.version++;
}
}
/**
金额值对象
*/
@Value
public class Money implements Comparable {
private final long amount; // 单位:分
private final String currency;public static Money of(long amount, String currency) {
return new Money(amount, currency);}
public Money add(Money other) {
validateCurrency(other); return new Money(this.amount + other.amount, this.currency);}
public Money subtract(Money other) {
validateCurrency(other); return new Money(this.amount - other.amount, this.currency);}
private void validateCurrency(Money other) {
if (!this.currency.equals(other.currency)) { throw new CurrencyMismatchException("币种不一致"); }}
@Override
public int compareTo(Money other) {validateCurrency(other); return Long.compare(this.amount, other.amount);}
}
三、事务性流水设计
在阿里,我们坚持 "流水驱动余额" 的设计原则:
java
/**
账户流水(不可变对象)
*/
@Getter
@Builder
public class AccountTransaction {
private Long transactionId;
private Long accountId;
private String bizNo; // 业务单号
private Integer bizType; // 业务类型
private Money amount;
private Integer direction; // 1:收入 2:支出 3:冻结 4:解冻
private Money balanceAfter; // 变更后余额
private String requestId; // 幂等ID
private Integer status; // 状态
private Date createTime;/**
- 生成幂等键
*/
public static String buildIdempotentKey(String bizNo, Integer bizType) {
return bizType + ":" + bizNo;
}
}
- 生成幂等键
/**
流水服务(保证幂等)
*/
@Service
@Slf4j
public class TransactionService {@Autowired
private AccountMapper accountMapper;
@Autowired
private TransactionMapper transactionMapper;/**
执行余额变更(幂等)
*/
@Transactional(rollbackFor = Exception.class)
public AccountTransaction executeWithIdempotent(Long accountId, Money amount, Integer direction, String bizNo, Integer bizType) {String idempotentKey = AccountTransaction.buildIdempotentKey(bizNo, bizType);
// 1. 检查幂等
AccountTransaction existTx = transactionMapper.selectByIdempotentKey(idempotentKey);
if (existTx != null) {log.info("幂等命中: {}", idempotentKey); return existTx;}
// 2. 乐观锁更新账户
Account account = accountMapper.selectForUpdate(accountId);Money balanceAfter;
switch (direction) {case 1: // 收入 account.credit(amount, bizNo); balanceAfter = account.getBalance(); break; case 2: // 支出 account.debit(amount, bizNo); balanceAfter = account.getBalance(); break; default: throw new UnsupportedOperationException("不支持的操作类型");}
int updateCount = accountMapper.updateWithVersion(account);
if (updateCount == 0) {throw new OptimisticLockException("账户更新失败,请重试");}
// 3. 记录流水
AccountTransaction transaction = AccountTransaction.builder().accountId(accountId) .bizNo(bizNo) .bizType(bizType) .amount(amount) .direction(direction) .balanceAfter(balanceAfter) .requestId(idempotentKey) .status(1) .createTime(new Date()) .build();transactionMapper.insert(transaction);
return transaction;
}
}
四、高并发下的性能优化
4.1 缓存架构
java
/**
账户缓存设计
*/
@Component
@Slf4j
public class AccountCache {private static final String CACHE_KEY_PREFIX = "acc:";
private static final int CACHE_EXPIRE_SECONDS = 3600;@Autowired
private RedisTemplate redisTemplate;
@Autowired
private AccountMapper accountMapper;/**
两级缓存:本地Caffeine + Redis
*/
private final Cache localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.SECONDS)
.recordStats()
.build();/**
获取账户(带防缓存穿透)
*/
public Account getAccount(Long accountId) {
String key = CACHE_KEY_PREFIX + accountId;// 1. 本地缓存
Account account = localCache.getIfPresent(key);
if (account != null) {return account;}
// 2. Redis缓存
String json = redisTemplate.opsForValue().get(key);
if (StringUtils.hasText(json)) {account = JSON.parseObject(json, Account.class); localCache.put(key, account); return account;}
// 3. 查询DB(布隆过滤器防穿透)
account = accountMapper.selectById(accountId);
if (account == null) {// 缓存空值 redisTemplate.opsForValue().set(key, "", 60, TimeUnit.SECONDS); return null;}
// 写入缓存
redisTemplate.opsForValue().set(key, JSON.toJSONString(account),CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS);localCache.put(key, account);
return account;
}/**
- 缓存失效(更新后主动失效)
*/
public void evict(Long accountId) {
String key = CACHE_KEY_PREFIX + accountId;
localCache.invalidate(key);
redisTemplate.delete(key);
}
}
4.2 异步记账
对于非实时性要求不高的场景,采用异步记账提升吞吐量:
java
/**
异步记账处理器
*/
@Component
@Slf4j
public class AsyncAccountingProcessor {@Autowired
private TransactionService transactionService;
@Autowired
private ExecutorService asyncExecutor;/**
异步记账(带重试)
*/
@Retryable(value = {OptimisticLockException.class}, maxAttempts = 3, backoff = @Backoff(delay = 100))
public CompletableFuture asyncDebit(Long accountId, Money amount, String bizNo, Integer bizType) {return CompletableFuture.supplyAsync(() -> {
try { return transactionService.executeWithIdempotent( accountId, amount, 2, bizNo, bizType); } catch (Exception e) { log.error("异步记账失败: accountId={}, bizNo={}", accountId, bizNo, e); throw new AccountingException(e); }}, asyncExecutor);
}/**
批量记账(合并处理)
*/
@Transactional(rollbackFor = Exception.class)
public void batchAccounting(List transactions) {
// 按账户分组,批量更新
Map> groupByAccount = transactions.stream().collect(Collectors.groupingBy(AccountTransaction::getAccountId));for (Map.Entry> entry : groupByAccount.entrySet()) {
Long accountId = entry.getKey(); List<AccountTransaction> txList = entry.getValue(); // 计算净变动 Money netChange = calculateNetChange(txList); // 更新账户 Account account = accountMapper.selectForUpdate(accountId); account.batchUpdate(netChange); accountMapper.updateWithVersion(account); // 批量插入流水 transactionMapper.batchInsert(txList);}
}private Money calculateNetChange(List transactions) {
long netAmount = transactions.stream().mapToLong(tx -> tx.getDirection() == 1 ? tx.getAmount().getAmount() : -tx.getAmount().getAmount()) .sum();return Money.of(netAmount, "CNY");
}
}
五、分布式一致性保证
5.1 TCC模式实现
对于跨多个服务的余额操作,采用TCC分布式事务:
java
/**
TCC 余额操作接口
*/
@Component
@Slf4j
public class AccountTccService {@Autowired
private AccountMapper accountMapper;
@Autowired
private TransactionMapper transactionMapper;/**
Try 阶段:预留资源
*/
@Transactional(rollbackFor = Exception.class)
public void tryDebit(Long accountId, Money amount, String bizNo) {
// 1. 幂等检查
String tccId = "TCC:" + bizNo;
if (transactionMapper.existsTccRecord(tccId)) {return;}
// 2. 冻结资金
Account account = accountMapper.selectForUpdate(accountId);
account.freeze(amount, bizNo);
accountMapper.updateWithVersion(account);// 3. 记录TCC事务日志
TccTransaction tccTx = TccTransaction.builder().tccId(tccId) .accountId(accountId) .amount(amount) .status(TccStatus.TRY) .createTime(new Date()) .build();transactionMapper.insertTccRecord(tccTx);
}/**
Confirm 阶段:确认扣款
*/
@Transactional(rollbackFor = Exception.class)
public void confirmDebit(Long accountId, Money amount, String bizNo) {
String tccId = "TCC:" + bizNo;
TccTransaction tccTx = transactionMapper.selectTccRecord(tccId);if (tccTx == null) {
throw new TccException("TCC记录不存在");}
if (tccTx.getStatus() != TccStatus.TRY) {
log.warn("重复confirm: {}", tccId); return;}
// 将冻结转为实际扣款
Account account = accountMapper.selectForUpdate(accountId);
account.confirmFreeze(amount);
accountMapper.updateWithVersion(account);// 更新TCC状态
tccTx.setStatus(TccStatus.CONFIRM);
transactionMapper.updateTccStatus(tccTx);// 记录正式流水
recordTransaction(accountId, amount, bizNo);
}/**
Cancel 阶段:取消冻结
*/
@Transactional(rollbackFor = Exception.class)
public void cancelDebit(Long accountId, Money amount, String bizNo) {
String tccId = "TCC:" + bizNo;
TccTransaction tccTx = transactionMapper.selectTccRecord(tccId);if (tccTx == null || tccTx.getStatus() != TccStatus.TRY) {
log.warn("无需cancel: {}", tccId); return;}
// 解冻资金
Account account = accountMapper.selectForUpdate(accountId);
account.unfreeze(amount, bizNo);
accountMapper.updateWithVersion(account);// 更新TCC状态
tccTx.setStatus(TccStatus.CANCEL);
transactionMapper.updateTccStatus(tccTx);
}
}
5.2 对账与补偿
java
/**
日终对账服务
*/
@Component
@Slf4j
public class ReconciliationService {@Autowired
private AccountMapper accountMapper;
@Autowired
private TransactionMapper transactionMapper;/**
对账:校验账户余额 = 所有流水汇总 + 期初余额
*/
public ReconciliationResult reconcile(Long accountId, Date date) {
Account account = accountMapper.selectById(accountId);// 计算当日流水汇总
Money totalFlow = transactionMapper.sumByAccountIdAndDate(accountId, date);// 获取前一日余额
Money previousBalance = getPreviousDayBalance(accountId, date);// 校验
Money expectedBalance = previousBalance.add(totalFlow);
boolean isMatched = account.getBalance().equals(expectedBalance);if (!isMatched) {
log.error("对账不平: accountId={}, 预期={}, 实际={}", accountId, expectedBalance, account.getBalance()); // 触发告警 alertService.sendReconciliationAlert(accountId, expectedBalance, account.getBalance()); // 生成补账任务 createCompensationTask(accountId, expectedBalance, account.getBalance());}
return ReconciliationResult.builder()
.accountId(accountId) .date(date) .expectedBalance(expectedBalance) .actualBalance(account.getBalance()) .isMatched(isMatched) .build();}
/**
自动补账
*/
@Transactional(rollbackFor = Exception.class)
public void compensate(Long accountId, Money diffAmount, String reason) {
log.warn("执行补账: accountId={}, diff={}, reason={}", accountId, diffAmount, reason);Account account = accountMapper.selectForUpdate(accountId);
account.compensate(diffAmount);
accountMapper.updateWithVersion(account);// 记录补账流水
AccountTransaction compensationTx = AccountTransaction.builder().accountId(accountId) .bizNo(UUID.randomUUID().toString()) .bizType(BizType.COMPENSATION) .amount(diffAmount) .direction(diffAmount.getAmount() > 0 ? 1 : 2) .balanceAfter(account.getBalance()) .requestId("COMP:" + System.currentTimeMillis()) .status(1) .createTime(new Date()) .build();transactionMapper.insert(compensationTx);
}
}
六、监控与告警
java
/**
余额监控
*/
@Component
@Slf4j
public class BalanceMonitor {@Autowired
private MeterRegistry meterRegistry;private AtomicLong totalDebitAmount = new AtomicLong();
private AtomicLong totalCreditAmount = new AtomicLong();@PostConstruct
public void init() {// 注册监控指标 Gauge.builder("balance.total.debit", totalDebitAmount, AtomicLong::get) .description("总扣款金额") .register(meterRegistry); Gauge.builder("balance.total.credit", totalCreditAmount, AtomicLong::get) .description("总加款金额") .register(meterRegistry); // 定时检查异常余额 ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate(this::checkAbnormalBalances, 5, 5, TimeUnit.MINUTES);}
/**
检查异常余额
*/
private void checkAbnormalBalances() {
// 检查负余额
List negativeBalanceAccounts = accountMapper.selectNegativeBalance();
if (!negativeBalanceAccounts.isEmpty()) {log.error("发现负余额账户: {}", negativeBalanceAccounts); alertService.sendAlert("负余额告警", negativeBalanceAccounts);}
// 检查余额超过阈值
Money threshold = Money.of(10_000_000_00L, "CNY"); // 10亿分 = 1000万
List highBalanceAccounts = accountMapper.selectBalanceGreaterThan(threshold);
if (highBalanceAccounts.size() > 100) {log.warn("高余额账户数过多: {}", highBalanceAccounts.size());}
}/**
记录操作耗时
*/
@Around("@annotation(Monitored)")
public Object monitor(ProceedingJoinPoint joinPoint) throws Throwable {
long start = System.currentTimeMillis();
try {return joinPoint.proceed();} finally {
long duration = System.currentTimeMillis() - start; Timer.Sample sample = Timer.start(meterRegistry); sample.stop(Timer.builder("balance.operation.duration") .tag("method", joinPoint.getSignature().getName()) .register(meterRegistry)); if (duration > 100) { log.warn("余额操作耗时过长: {}ms, method={}", duration, joinPoint.getSignature().getName()); }}
}
}