引言
在互联网业务的高可用体系中,单机房故障可导致全站服务中断,同城双活无法应对城市级灾难,异地多活已成为保障业务连续性的核心方案。
一、异地多活的核心认知与边界定义
1.1 核心定义
异地多活是指跨地理区域部署多个对等的业务单元,每个单元都能独立承接用户流量、完成全链路业务闭环,单个单元发生故障时,其他单元可快速接管流量,保障业务连续可用。
1.2 易混淆概念明确区分
很多架构设计的误区,都源于对核心概念的边界模糊,这里做精准区分:
- 异地备份 vs 异地多活:异地备份是冷备模式,仅做数据离线存储,平时不承接业务流量,故障时RTO(恢复时间目标)通常为小时级,RPO(恢复点目标)通常为分钟级;异地多活所有单元均在线承接流量,故障时RTO可做到秒级,RPO可控制在秒级甚至零丢失。
- 同城双活 vs 异地多活:同城双活部署在同一城市的两个机房,网络延迟低于1ms,可实现强一致数据同步,但无法应对地震、区域停电等城市级故障;异地多活跨城市部署,网络延迟通常在20-50ms,无法实现全域强一致,但可抵御城市级灾难,可用性等级更高。
- 分布式部署 vs 异地多活:分布式部署是将业务拆分为多个服务部署,不强制跨地域,也不具备标准化的容灾能力;异地多活是分布式部署的高阶形态,核心是跨地域的容灾能力、流量调度能力和数据一致性保障。
1.3 异地多活的核心衡量指标
- RTO(恢复时间目标):故障发生后,业务恢复正常服务的最长时间,是衡量容灾能力的核心指标。
- RPO(恢复点目标):故障发生后,系统可恢复到的最近数据时间点,代表故障时丢失的数据量。
- 业务无损率:故障切换过程中,业务正常处理的请求占比,核心业务需做到100%无损。
- 数据一致性:多单元之间数据同步的准确性和时效性,分为强一致性、会话一致性和最终一致性三个等级。
二、异地多活的基石:单元化架构设计
没有合理的单元化设计,异地多活就是空中楼阁。单元化是解决跨地域调用延迟、数据冲突、业务闭环的核心方案。
2.1 单元化的核心本质
将完整的业务系统,按照固定维度拆分为多个独立、对等、闭环的业务单元,每个单元只负责一部分用户的全链路业务处理,单元内的所有服务调用、数据操作都在本地域完成,从根源上杜绝跨地域的业务调用。
2.2 单元化设计的三大黄金原则
这三个原则是单元化架构的生命线,违反任意一个,都会导致架构失效:
- 数据封闭性原则:每个单元仅管理归属本单元的分片数据,所有写操作必须在归属单元内完成,禁止跨单元写数据,从根源上避免数据冲突。
- 单元对等性原则:所有业务单元的部署结构、服务能力、技术栈完全一致,不存在“中心单元”和“边缘单元”的区别,任意一个单元都能承接全量业务流量,故障时可无缝切换。
- 路由一致性原则:对于同一个路由键(如用户ID),无论何时、从何地接入,都必须被路由到同一个归属单元,这是保障数据一致性、避免跨单元调用的核心前提。
2.3 单元化架构分层设计
各分层核心职责:
- 全局DNS/GSLB:负责用户请求的就近接入,将用户流量调度到最近地域的单元接入层。
- 流量调度与路由中心:负责全局路由规则的存储、下发和校验,管理各单元的健康状态,执行故障切流、灰度发布等调度操作。
- 单元接入层:负责本单元的流量接入、路由校验,将匹配本单元的流量转发到内部业务集群,不匹配的流量重定向到归属单元。
- 业务服务集群:单元内的全量业务服务,实现完整的业务闭环,所有服务调用均在单元内完成。
- 数据存储层&中间件集群:单元内独立的数据库、缓存、消息队列等组件,仅为本单元的业务服务提供支持。
2.4 单元化分片策略
分片策略的核心是路由键的选择,直接决定了单元化架构的稳定性,路由键必须满足固定不变、全局唯一、分布均匀三个核心要求。
- 用户ID分片:最主流的分片策略,通过用户ID取模的方式,将用户分配到不同的业务单元。例如3个单元,用户ID%3=0的归属上海单元,=1的归属深圳单元,=2的归属北京单元。该策略天然满足三个核心要求,适合绝大多数ToC业务。
- 地域分片:按照用户的接入地域分配归属单元,例如华东用户归属上海单元,华南用户归属深圳单元。该策略可实现最低的访问延迟,适合对延迟敏感的业务,需配合用户迁移机制,解决用户跨地域流动的问题。
- 业务维度分片:按照业务线拆分单元,例如交易单元、用户单元、内容单元。该策略适合业务线边界清晰的大型系统,通常需配合用户ID分片使用。
2.5 全局数据的处理方案
系统中存在部分不按用户分片的全局数据(如商品信息、基础配置、公共字典),需单独设计处理方案:
- 方案一:中心单元只读模式:将全局数据部署在独立的中心单元,所有业务单元仅可读取本单元的全局数据副本,写操作只能在中心单元完成,写入后通过数据同步机制同步到所有业务单元。该方案适合读多写少的全局数据,可从根源上避免写冲突。
- 方案二:全单元冗余模式:每个业务单元都部署完整的全局数据副本,写操作可在任意单元完成,写入后通过数据同步机制同步到所有单元。该方案适合读写都较为频繁的全局数据,需通过分布式锁、乐观锁等机制解决写冲突问题。
三、异地多活的核心命脉:跨地域数据同步
数据同步是异地多活架构中最难、最容易出问题的环节,直接决定了系统的数据一致性和业务可用性。
3.1 数据同步的核心矛盾:CAP定理的取舍
跨地域网络天然存在分区容错性(P),根据CAP定理,系统无法同时满足一致性(C)和可用性(A),必须做出取舍。
- 异地多活场景下,99%的业务场景都会选择AP模型,优先保障业务可用性,采用最终一致性模型,仅在极少数核心金融场景采用有限的强一致性模型。
- 跨地域强一致性需要通过Raft、Paxos等分布式共识算法实现,单次写入的RT通常会达到100ms以上,仅适合极小数据量、极高一致性要求的场景,不适合通用业务。
3.2 数据一致性模型的精准区分
- 强一致性:写入操作成功后,所有单元的读操作都能立即返回最新的值。跨地域场景下性能损耗极大,仅适合账户余额、金融流水等核心场景。
- 会话一致性:同一个用户的会话内,读操作一定能读到自己最新的写入结果。该模型可通过路由一致性天然实现,只要用户的流量永远路由到归属单元,就能保障会话一致性,是异地多活必须实现的基础模型。
- 最终一致性:写入操作成功后,经过一定的同步延迟,所有单元的数据会达到一致状态。是异地多活的主流方案,兼顾性能和可用性,同步延迟通常可控制在几十到几百毫秒。
3.3 主流数据同步方案对比与落地
方案一:MySQL原生主主复制
基于MySQL原生的主从复制能力,实现双向数据同步,每个单元的MySQL节点既是主节点,也是其他节点的从节点,通过binlog实现数据双向同步。适用场景:单元化拆分彻底,每个单元仅写自己的分片数据,无跨单元写冲突的简单业务场景。
MySQL 8.0 双主复制核心配置: 上海节点my.cnf配置
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW
auto-increment-increment=2
auto-increment-offset=1
replicate-do-db=jam_demo
binlog-ignore-db=mysql
binlog-ignore-db=information_schema
binlog-ignore-db=performance_schema
gtid_mode=ON
enforce_gtid_consistency=ON
深圳节点my.cnf配置
[mysqld]
server-id=2
log-bin=mysql-bin
binlog_format=ROW
auto-increment-increment=2
auto-increment-offset=2
replicate-do-db=jam_demo
binlog-ignore-db=mysql
binlog-ignore-db=information_schema
binlog-ignore-db=performance_schema
gtid_mode=ON
enforce_gtid_consistency=ON
配置核心说明:
- server-id必须全局唯一,不能重复
- binlog_format必须使用ROW模式,STATEMENT模式跨地域同步会出现数据不一致问题
- auto-increment-increment设置为单元总数,auto-increment-offset设置为本单元的偏移量,避免自增主键冲突
- 开启GTID模式,方便故障切换和同步状态追踪
同步权限配置SQL:
CREATE USER 'repl'@'10.0.%.%' IDENTIFIED WITH mysql_native_password BY 'Repl@123456';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'repl'@'10.0.%.%';
FLUSH PRIVILEGES;
双向同步配置SQL(两个节点互相配置):
CHANGE MASTER TO MASTER_HOST='对端节点IP',MASTER_USER='repl',MASTER_PASSWORD='Repl@123456',MASTER_PORT=3306,MASTER_AUTO_POSITION=1;
START SLAVE;
该方案的优缺点:
- 优点:配置简单,MySQL原生支持,同步延迟低,无需额外组件
- 缺点:可控性差,出现数据冲突难以处理,不支持数据过滤和转换,无法应对复杂业务场景
方案二:基于binlog的增量同步(Canal+MQ)
这是生产环境最主流、可控性最高的同步方案。核心原理是:Canal伪装成MySQL的从节点,监听binlog的增量数据,将数据发送到RocketMQ/Kafka,每个单元的消费程序消费MQ中的数据,写入到本地数据库,实现跨地域数据同步。
核心优势:支持灵活的数据过滤、转换、幂等处理,可控性强,出现问题可回溯,支持异构数据源同步,可灵活控制同步的范围和速度。
Canal Server核心配置(conf/example/instance.properties):
canal.instance.master.address=本地MySQL地址:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
canal.instance.defaultDatabaseName=jam_demo
canal.instance.filter.regex=jam_demo\\..*
canal.mq.topic=canal_binlog_sync
canal.mq.partitionHash=${table}.${pk}
3.4 数据同步的核心保障机制
- 幂等性保障:同步消息可能会重复投递,消费代码必须保证幂等性。常用方案是使用binlog文件名+偏移量作为唯一幂等键,或者使用数据主键+更新时间作为幂等标识,重复消息直接跳过处理。
- 顺序性保障:同一个主键的数据变更,必须保证顺序消费,否则会出现数据覆盖的问题。实现方式是通过MQ的哈希分区,将同一个主键的消息发送到同一个分区,保证单分区的顺序消费。
- 异常重试与死信处理:消费失败的消息,执行有限次数的重试,超过重试次数的消息进入死信队列,人工介入处理,避免同步流程阻塞。
- 数据一致性校验:建立定时校验机制,通过数据校验和、全量数据对比等方式,定期对比不同单元之间的数据差异,及时发现同步异常,保障数据一致性。
四、异地多活的神经中枢:流量调度体系
流量调度是异地多活架构的控制核心,负责将正确的流量路由到正确的单元,保障路由一致性、业务可用性和故障快速切换。
4.1 流量调度的全链路流程
4.2 核心路由实现
接入层路由
在Nginx/Ingress层面实现路由规则,根据请求中的用户ID计算归属单元,匹配本单元则转发到内部服务,不匹配则重定向到归属单元的接入地址。
Nginx核心路由配置(基于Lua脚本):
http {
lua_shared_dict unit_route_map 10m;
init_by_lua_block {
local unit_route_map = ngx.shared.unit_route_map
unit_route_map:set("0", "https://sh.demo.com")
unit_route_map:set("1", "https://sz.demo.com")
unit_route_map:set("2", "https://bj.demo.com")
}
server {
listen 80;
server_name demo.com;
set $current_unit "0";
location / {
set $user_id $http_x_user_id;
if ($user_id = "") {
proxy_pass http://business_cluster;
break;
}
set_by_lua_block $unit_code {
local user_id = ngx.var.user_id
local unit_count = 3
return tostring(tonumber(user_id) % unit_count)
}
if ($unit_code != $current_unit) {
set_by_lua_block $redirect_url {
local unit_route_map = ngx.shared.unit_route_map
local unit_code = ngx.var.unit_code
local uri = ngx.var.request_uri
return unit_route_map:get(unit_code) .. uri
}
return 302 $redirect_url;
}
proxy_pass http://business_cluster;
}
}
}
服务层路由校验
在业务服务层面,通过拦截器实现路由校验,防止非法的跨单元调用,保障路由一致性。
单元路由上下文持有类:
package com.jam.demo.common;
/**
* 单元上下文持有类
* @author ken
*/
public class UnitContextHolder {
private static final ThreadLocal<String> UNIT_CODE_HOLDER = new ThreadLocal<>();
private static final ThreadLocal<Long> USER_ID_HOLDER = new ThreadLocal<>();
private UnitContextHolder() {}
public static void setCurrentUnitCode(String unitCode) {
UNIT_CODE_HOLDER.set(unitCode);
}
public static String getCurrentUnitCode() {
return UNIT_CODE_HOLDER.get();
}
public static void setCurrentUserId(Long userId) {
USER_ID_HOLDER.set(userId);
}
public static Long getCurrentUserId() {
return USER_ID_HOLDER.get();
}
public static void clear() {
UNIT_CODE_HOLDER.remove();
USER_ID_HOLDER.remove();
}
}
单元路由拦截器:
package com.jam.demo.config;
import com.jam.demo.common.UnitContextHolder;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.servlet.HandlerInterceptor;
/**
* 单元路由拦截器
* @author ken
*/
@Slf4j
@Component
public class UnitRouteInterceptor implements HandlerInterceptor {
@Value("${multi.active.current.unit.code}")
private String currentUnitCode;
private static final String USER_ID_HEADER = "X-User-Id";
private static final String UNIT_CODE_HEADER = "X-Unit-Code";
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String userId = request.getHeader(USER_ID_HEADER);
if (!StringUtils.hasText(userId)) {
return true;
}
String requestUnitCode = request.getHeader(UNIT_CODE_HEADER);
if (StringUtils.hasText(requestUnitCode) && !currentUnitCode.equals(requestUnitCode)) {
response.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
response.setHeader("Location", getUnitAddress(requestUnitCode, request));
return false;
}
UnitContextHolder.setCurrentUnitCode(currentUnitCode);
UnitContextHolder.setCurrentUserId(Long.valueOf(userId));
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UnitContextHolder.clear();
}
private String getUnitAddress(String unitCode, HttpServletRequest request) {
return "https://" + unitCode.toLowerCase() + ".demo.com" + request.getRequestURI();
}
}
WebMvc配置类,注册拦截器:
package com.jam.demo.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* WebMvc配置类
* @author ken
*/
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
private final UnitRouteInterceptor unitRouteInterceptor;
public WebMvcConfig(UnitRouteInterceptor unitRouteInterceptor) {
this.unitRouteInterceptor = unitRouteInterceptor;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(unitRouteInterceptor)
.addPathPatterns("/**")
.excludePathPatterns("/v3/api-docs/**", "/swagger-ui/**", "/swagger-ui.html");
}
}
4.3 全局唯一ID设计
多单元同时写入数据时,必须使用全局唯一ID,避免主键冲突,禁止使用数据库自增主键。这里实现基于号段模式的分布式ID生成器,性能高、无时钟回拨问题、适配异地多活场景。
MySQL号段表结构:
CREATE TABLE IF NOT EXISTS t_id_segment (
id BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键ID',
biz_type VARCHAR(32) NOT NULL COMMENT '业务类型',
current_max_id BIGINT NOT NULL DEFAULT 0 COMMENT '当前最大ID',
step INT NOT NULL DEFAULT 1000 COMMENT '号段步长',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_biz_type (biz_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分布式ID号段表';
INSERT INTO t_id_segment (biz_type, current_max_id, step) VALUES ('user', 0, 1000), ('order', 0, 1000) ON DUPLICATE KEY UPDATE update_time = CURRENT_TIMESTAMP;
号段模式分布式ID生成器实现:
package com.jam.demo.common;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.jam.demo.entity.IdSegment;
import com.jam.demo.mapper.IdSegmentMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
* 号段模式分布式ID生成器
* @author ken
*/
@Slf4j
@Component
public class SegmentIdGenerator {
private final IdSegmentMapper idSegmentMapper;
private final PlatformTransactionManager transactionManager;
private final ConcurrentHashMap<String, IdSegmentHolder> segmentHolderMap = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();
public SegmentIdGenerator(IdSegmentMapper idSegmentMapper, PlatformTransactionManager transactionManager) {
this.idSegmentMapper = idSegmentMapper;
this.transactionManager = transactionManager;
}
/**
* 生成分布式ID
* @param bizType 业务类型
* @return 全局唯一ID
*/
public Long generateId(String bizType) {
if (!StringUtils.hasText(bizType)) {
throw new IllegalArgumentException("业务类型不能为空");
}
IdSegmentHolder holder = segmentHolderMap.get(bizType);
if (ObjectUtils.isEmpty(holder) || !holder.usable()) {
lock.lock();
try {
holder = segmentHolderMap.get(bizType);
if (ObjectUtils.isEmpty(holder) || !holder.usable()) {
holder = refreshSegment(bizType);
segmentHolderMap.put(bizType, holder);
}
} finally {
lock.unlock();
}
}
return holder.nextId();
}
/**
* 刷新号段
* @param bizType 业务类型
* @return 号段持有对象
*/
private IdSegmentHolder refreshSegment(String bizType) {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus status = transactionManager.getTransaction(definition);
try {
LambdaUpdateWrapper<IdSegment> updateWrapper = new LambdaUpdateWrapper<IdSegment>()
.eq(IdSegment::getBizType, bizType)
.setSql("current_max_id = current_max_id + step");
int updateCount = idSegmentMapper.update(null, updateWrapper);
if (updateCount == 0) {
throw new RuntimeException("号段更新失败,业务类型:" + bizType);
}
IdSegment idSegment = idSegmentMapper.selectOne(new LambdaQueryWrapper<IdSegment>().eq(IdSegment::getBizType, bizType));
transactionManager.commit(status);
long maxId = idSegment.getCurrentMaxId();
long step = idSegment.getStep();
return new IdSegmentHolder(maxId - step, maxId);
} catch (Exception e) {
transactionManager.rollback(status);
log.error("号段刷新失败,业务类型:{}", bizType, e);
throw new RuntimeException("号段刷新失败", e);
}
}
/**
* 号段持有对象
*/
private static class IdSegmentHolder {
private final AtomicLong currentId;
private final long maxId;
public IdSegmentHolder(long minId, long maxId) {
this.currentId = new AtomicLong(minId);
this.maxId = maxId;
}
public boolean usable() {
return currentId.get() < maxId;
}
public long nextId() {
return currentId.incrementAndGet();
}
}
}
4.4 流量调度的核心场景
- 正常流量调度:按照地域就近接入原则,将用户流量调度到最近的地域单元,同时通过路由规则保障同一个用户的流量永远路由到归属单元,兼顾访问延迟和路由一致性。
- 故障切流:当某个单元出现故障时,GSLB自动将该单元的流量切换到其他正常单元,同时更新全局路由规则,将故障单元的用户临时路由到备用单元。切流需采用灰度策略,先切10%流量验证,再逐步全量,避免引发雪崩。
- 灰度发布:将新版本的服务部署在指定单元,然后将少量用户的流量切到该单元,验证新版本的稳定性,验证通过后再全量发布到所有单元,大幅降低发布风险。
五、业务实现
5.1 项目基础配置
pom.xml依赖配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<groupId>com.jam</groupId>
<artifactId>multi-active-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>multi-active-demo</name>
<description>异地多活架构demo</description>
<properties>
<java.version>17</java.version>
<mybatis-plus.version>3.5.7</mybatis-plus.version>
<fastjson2.version>2.0.52</fastjson2.version>
<rocketmq.version>2.2.5</rocketmq.version>
<canal.version>1.1.7</canal.version>
<guava.version>33.1.0-jre</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.yml配置文件:
spring:
application:
name: multi-active-demo
datasource:
url: jdbc:mysql://127.0.0.1:3306/jam_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: root
password: Root@123456
driver-class-name: com.mysql.cj.jdbc.Driver
jackson:
default-property-inclusion: non_null
time-zone: Asia/Shanghai
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: multi-active-producer-group
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
global-config:
db-config:
id-type: input
springdoc:
swagger-ui:
path: /swagger-ui.html
api-docs:
path: /v3/api-docs
packages-to-scan: com.jam.demo.controller
multi:
active:
current:
unit:
code: SH-01
server:
port: 8080
5.2 核心业务表结构
CREATE DATABASE IF NOT EXISTS jam_demo DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE jam_demo;
CREATE TABLE IF NOT EXISTS t_user (
id BIGINT NOT NULL COMMENT '用户ID',
username VARCHAR(64) NOT NULL COMMENT '用户名',
phone VARCHAR(11) NOT NULL COMMENT '手机号',
unit_code VARCHAR(16) NOT NULL COMMENT '归属单元编码',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_username (username),
UNIQUE KEY uk_phone (phone),
KEY idx_unit_code (unit_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户表';
CREATE TABLE IF NOT EXISTS t_order (
id BIGINT NOT NULL COMMENT '订单ID',
user_id BIGINT NOT NULL COMMENT '用户ID',
order_amount DECIMAL(12,2) NOT NULL COMMENT '订单金额',
order_status TINYINT NOT NULL DEFAULT 0 COMMENT '订单状态:0-待支付,1-已支付,2-已取消',
unit_code VARCHAR(16) NOT NULL COMMENT '归属单元编码',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
KEY idx_user_id (user_id),
KEY idx_unit_code (unit_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='订单表';
5.3 核心业务代码实现
用户实体类:
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 用户实体类
* @author ken
*/
@Data
@TableName("t_user")
@Schema(description = "用户实体")
public class User {
@TableId(type = IdType.INPUT)
@Schema(description = "用户ID", example = "1000000000001")
private Long id;
@Schema(description = "用户名", example = "jam_user")
private String username;
@Schema(description = "手机号", example = "13800138000")
private String phone;
@Schema(description = "归属单元编码", example = "SH-01")
private String unitCode;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
用户Mapper接口:
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.User;
import org.apache.ibatis.annotations.Mapper;
/**
* 用户Mapper接口
* @author ken
*/
@Mapper
public interface UserMapper extends BaseMapper<User> {
}
用户服务实现类:
package com.jam.demo.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.common.SegmentIdGenerator;
import com.jam.demo.common.UnitContextHolder;
import com.jam.demo.entity.User;
import com.jam.demo.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.List;
/**
* 用户服务实现类
* @author ken
*/
@Slf4j
@Service
public class UserService {
private final UserMapper userMapper;
private final SegmentIdGenerator segmentIdGenerator;
private final PlatformTransactionManager transactionManager;
public UserService(UserMapper userMapper, SegmentIdGenerator segmentIdGenerator, PlatformTransactionManager transactionManager) {
this.userMapper = userMapper;
this.segmentIdGenerator = segmentIdGenerator;
this.transactionManager = transactionManager;
}
/**
* 创建用户
* @param user 用户信息
* @return 创建成功的用户ID
*/
public Long createUser(User user) {
if (ObjectUtils.isEmpty(user)) {
throw new IllegalArgumentException("用户信息不能为空");
}
if (!StringUtils.hasText(user.getUsername())) {
throw new IllegalArgumentException("用户名不能为空");
}
if (!StringUtils.hasText(user.getPhone())) {
throw new IllegalArgumentException("手机号不能为空");
}
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
TransactionStatus status = transactionManager.getTransaction(definition);
try {
Long userId = segmentIdGenerator.generateId("user");
user.setId(userId);
user.setUnitCode(UnitContextHolder.getCurrentUnitCode());
userMapper.insert(user);
transactionManager.commit(status);
log.info("用户创建成功,用户ID:{},归属单元:{}", userId, user.getUnitCode());
return userId;
} catch (Exception e) {
transactionManager.rollback(status);
log.error("用户创建失败,用户名:{}", user.getUsername(), e);
throw new RuntimeException("用户创建失败", e);
}
}
/**
* 根据用户ID查询用户
* @param userId 用户ID
* @return 用户信息
*/
public User getUserById(Long userId) {
if (ObjectUtils.isEmpty(userId) || userId <= 0) {
throw new IllegalArgumentException("用户ID不能为空");
}
return userMapper.selectById(userId);
}
/**
* 根据单元编码查询用户列表
* @param unitCode 单元编码
* @return 用户列表
*/
public List<User> getUserListByUnitCode(String unitCode) {
if (!StringUtils.hasText(unitCode)) {
throw new IllegalArgumentException("单元编码不能为空");
}
List<User> userList = userMapper.selectList(new LambdaQueryWrapper<User>().eq(User::getUnitCode, unitCode));
if (CollectionUtils.isEmpty(userList)) {
return List.of();
}
return userList;
}
}
用户控制器:
package com.jam.demo.controller;
import com.jam.demo.entity.User;
import com.jam.demo.service.UserService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* 用户控制器
* @author ken
*/
@RestController
@RequestMapping("/user")
@Tag(name = "用户管理", description = "用户相关操作接口")
public class UserController {
private final UserService userService;
public UserController(UserService userService) {
this.userService = userService;
}
@PostMapping("/create")
@Operation(summary = "创建用户", description = "创建新用户,返回用户ID")
public ResponseEntity<Long> createUser(@RequestBody User user) {
Long userId = userService.createUser(user);
return ResponseEntity.ok(userId);
}
@GetMapping("/{userId}")
@Operation(summary = "查询用户", description = "根据用户ID查询用户信息")
public ResponseEntity<User> getUserById(@PathVariable @Parameter(description = "用户ID") Long userId) {
User user = userService.getUserById(userId);
return ResponseEntity.ok(user);
}
@GetMapping("/list")
@Operation(summary = "查询单元用户列表", description = "根据单元编码查询用户列表")
public ResponseEntity<List<User>> getUserListByUnitCode(@RequestParam @Parameter(description = "单元编码") String unitCode) {
List<User> userList = userService.getUserListByUnitCode(unitCode);
return ResponseEntity.ok(userList);
}
}
binlog同步消费者实现:
package com.jam.demo.mq;
import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.jam.demo.entity.User;
import com.jam.demo.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* Canal binlog同步消费者
* @author ken
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "canal_binlog_sync", consumerGroup = "canal_sync_consumer_group")
public class CanalBinlogConsumer implements RocketMQListener<String> {
private final UserMapper userMapper;
private final PlatformTransactionManager transactionManager;
private final ConcurrentHashMap<String, Long> processedMessageMap = new ConcurrentHashMap<>();
public CanalBinlogConsumer(UserMapper userMapper, PlatformTransactionManager transactionManager) {
this.userMapper = userMapper;
this.transactionManager = transactionManager;
}
@Override
public void onMessage(String message) {
try {
CanalEntry.Entry entry = JSON.parseObject(message, CanalEntry.Entry.class);
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
return;
}
String messageKey = entry.getHeader().getLogfileName() + "_" + entry.getHeader().getLogfileOffset();
if (processedMessageMap.containsKey(messageKey)) {
log.info("重复消息,跳过处理,messageKey:{}", messageKey);
return;
}
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();
if ("t_user".equals(tableName)) {
handleUserDataChange(eventType, rowChange.getRowDatasList());
}
processedMessageMap.put(messageKey, System.currentTimeMillis());
processedMessageMap.entrySet().removeIf(entry -> System.currentTimeMillis() - entry.getValue() > 24 * 60 * 60 * 1000);
} catch (Exception e) {
log.error("binlog消息处理失败", e);
throw new RuntimeException("binlog消息处理失败", e);
}
}
/**
* 处理用户表数据变更
* @param eventType 事件类型
* @param rowDatas 行数据列表
*/
private void handleUserDataChange(CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDatas) {
if (ObjectUtils.isEmpty(rowDatas)) {
return;
}
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
TransactionStatus status = transactionManager.getTransaction(definition);
try {
for (CanalEntry.RowData rowData : rowDatas) {
User user = convertRowDataToUser(rowData.getAfterColumnsList());
switch (eventType) {
case INSERT -> userMapper.insert(user);
case UPDATE -> userMapper.updateById(user);
case DELETE -> userMapper.deleteById(user.getId());
default -> log.info("不支持的事件类型:{}", eventType);
}
}
transactionManager.commit(status);
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
/**
* 行数据转换为用户实体
* @param columns 列数据列表
* @return 用户实体
*/
private User convertRowDataToUser(List<CanalEntry.Column> columns) {
User user = new User();
for (CanalEntry.Column column : columns) {
switch (column.getName()) {
case "id" -> user.setId(Long.valueOf(column.getValue()));
case "username" -> user.setUsername(column.getValue());
case "phone" -> user.setPhone(column.getValue());
case "unit_code" -> user.setUnitCode(column.getValue());
case "create_time" -> user.setCreateTime(java.time.LocalDateTime.parse(column.getValue()));
case "update_time" -> user.setUpdateTime(java.time.LocalDateTime.parse(column.getValue()));
default -> {}
}
}
return user;
}
}
六、落地踩坑指南与最佳实践
6.1 核心踩坑指南
- 路由一致性被破坏:用户流量被错误转发到非归属单元,导致查不到数据。解决方案是建立全链路路由校验机制,从接入层到服务层都要执行路由校验,禁止跨单元写操作。
- 数据同步延迟引发业务异常:用户写入数据后,同步未完成时发生切流,导致新单元查不到数据。解决方案是切流前先完成数据同步校验,确保数据完全同步后再执行切流,切流过程中先禁写、再切流、最后放开写。
- 主键冲突问题:使用数据库自增主键,多单元同时写入导致主键冲突。解决方案是必须使用分布式ID,禁止使用数据库自增主键。
- 跨单元调用导致性能雪崩:单元化拆分不彻底,服务之间存在大量跨单元调用,导致接口RT飙升。解决方案是严格遵守数据封闭性原则,实现单元内业务闭环,杜绝跨单元业务调用。
- 同步幂等性缺失:消息重复消费导致数据被错误覆盖。解决方案是使用binlog文件名+偏移量作为唯一幂等键,确保重复消息不会被重复处理。
6.2 落地最佳实践
- 渐进式拆分:单元化拆分先从核心业务开始,逐步扩展到非核心业务,不要一次性全量拆分,降低落地风险。
- 优先选择用户ID分片:分片策略优先选择用户ID,保证数据分布均匀、路由固定,适配绝大多数业务场景。
- 数据同步优先选择Canal+MQ方案:该方案可控性高、扩展性强,可灵活应对复杂的业务同步需求。
- 灰度切流机制:所有流量切换操作都要采用灰度策略,先切1%、10%、50%的流量,验证无问题后再全量切换,避免引发雪崩。
- 全链路监控体系:建立覆盖数据同步延迟、单元健康度、流量分布、请求成功率、接口RT的全链路监控体系,及时发现和处理异常。
- 定期故障演练:每季度至少执行一次单元故障演练,模拟单元故障场景,验证切流能力和业务恢复能力,确保容灾预案有效。
总结
异地多活不是一个简单的技术组件,而是一套完整的架构体系,涵盖了单元化设计、数据同步、流量调度、监控运维、故障容灾等多个环节。只有严格遵守单元化三大黄金原则,做好数据同步的一致性保障,建立完善的流量调度体系,才能真正实现业务的高可用,抵御从机房级到城市级的各类故障,保障业务的连续稳定运行。