从 0 到 1 落地异地多活:单元化、数据同步与流量调度的核心壁垒全击穿

简介: 本文系统阐述异地多活架构核心实践:定义其为跨地域对等单元、独立闭环、秒级容灾的高可用方案;详解单元化设计三大原则(数据封闭、单元对等、路由一致);剖析数据同步(Canal+MQ为主)与流量调度(GSLB+路由校验)关键技术;并提供ID生成、分片策略及落地避坑指南。

引言

在互联网业务的高可用体系中,单机房故障可导致全站服务中断,同城双活无法应对城市级灾难,异地多活已成为保障业务连续性的核心方案。

一、异地多活的核心认知与边界定义

1.1 核心定义

异地多活是指跨地理区域部署多个对等的业务单元,每个单元都能独立承接用户流量、完成全链路业务闭环,单个单元发生故障时,其他单元可快速接管流量,保障业务连续可用。

1.2 易混淆概念明确区分

很多架构设计的误区,都源于对核心概念的边界模糊,这里做精准区分:

  • 异地备份 vs 异地多活:异地备份是冷备模式,仅做数据离线存储,平时不承接业务流量,故障时RTO(恢复时间目标)通常为小时级,RPO(恢复点目标)通常为分钟级;异地多活所有单元均在线承接流量,故障时RTO可做到秒级,RPO可控制在秒级甚至零丢失。
  • 同城双活 vs 异地多活:同城双活部署在同一城市的两个机房,网络延迟低于1ms,可实现强一致数据同步,但无法应对地震、区域停电等城市级故障;异地多活跨城市部署,网络延迟通常在20-50ms,无法实现全域强一致,但可抵御城市级灾难,可用性等级更高。
  • 分布式部署 vs 异地多活:分布式部署是将业务拆分为多个服务部署,不强制跨地域,也不具备标准化的容灾能力;异地多活是分布式部署的高阶形态,核心是跨地域的容灾能力、流量调度能力和数据一致性保障。

1.3 异地多活的核心衡量指标

  • RTO(恢复时间目标):故障发生后,业务恢复正常服务的最长时间,是衡量容灾能力的核心指标。
  • RPO(恢复点目标):故障发生后,系统可恢复到的最近数据时间点,代表故障时丢失的数据量。
  • 业务无损率:故障切换过程中,业务正常处理的请求占比,核心业务需做到100%无损。
  • 数据一致性:多单元之间数据同步的准确性和时效性,分为强一致性、会话一致性和最终一致性三个等级。

二、异地多活的基石:单元化架构设计

没有合理的单元化设计,异地多活就是空中楼阁。单元化是解决跨地域调用延迟、数据冲突、业务闭环的核心方案。

2.1 单元化的核心本质

将完整的业务系统,按照固定维度拆分为多个独立、对等、闭环的业务单元,每个单元只负责一部分用户的全链路业务处理,单元内的所有服务调用、数据操作都在本地域完成,从根源上杜绝跨地域的业务调用。

2.2 单元化设计的三大黄金原则

这三个原则是单元化架构的生命线,违反任意一个,都会导致架构失效:

  1. 数据封闭性原则:每个单元仅管理归属本单元的分片数据,所有写操作必须在归属单元内完成,禁止跨单元写数据,从根源上避免数据冲突。
  2. 单元对等性原则:所有业务单元的部署结构、服务能力、技术栈完全一致,不存在“中心单元”和“边缘单元”的区别,任意一个单元都能承接全量业务流量,故障时可无缝切换。
  3. 路由一致性原则:对于同一个路由键(如用户ID),无论何时、从何地接入,都必须被路由到同一个归属单元,这是保障数据一致性、避免跨单元调用的核心前提。

2.3 单元化架构分层设计

各分层核心职责:

  • 全局DNS/GSLB:负责用户请求的就近接入,将用户流量调度到最近地域的单元接入层。
  • 流量调度与路由中心:负责全局路由规则的存储、下发和校验,管理各单元的健康状态,执行故障切流、灰度发布等调度操作。
  • 单元接入层:负责本单元的流量接入、路由校验,将匹配本单元的流量转发到内部业务集群,不匹配的流量重定向到归属单元。
  • 业务服务集群:单元内的全量业务服务,实现完整的业务闭环,所有服务调用均在单元内完成。
  • 数据存储层&中间件集群:单元内独立的数据库、缓存、消息队列等组件,仅为本单元的业务服务提供支持。

2.4 单元化分片策略

分片策略的核心是路由键的选择,直接决定了单元化架构的稳定性,路由键必须满足固定不变、全局唯一、分布均匀三个核心要求。

  1. 用户ID分片:最主流的分片策略,通过用户ID取模的方式,将用户分配到不同的业务单元。例如3个单元,用户ID%3=0的归属上海单元,=1的归属深圳单元,=2的归属北京单元。该策略天然满足三个核心要求,适合绝大多数ToC业务。
  2. 地域分片:按照用户的接入地域分配归属单元,例如华东用户归属上海单元,华南用户归属深圳单元。该策略可实现最低的访问延迟,适合对延迟敏感的业务,需配合用户迁移机制,解决用户跨地域流动的问题。
  3. 业务维度分片:按照业务线拆分单元,例如交易单元、用户单元、内容单元。该策略适合业务线边界清晰的大型系统,通常需配合用户ID分片使用。

2.5 全局数据的处理方案

系统中存在部分不按用户分片的全局数据(如商品信息、基础配置、公共字典),需单独设计处理方案:

  • 方案一:中心单元只读模式:将全局数据部署在独立的中心单元,所有业务单元仅可读取本单元的全局数据副本,写操作只能在中心单元完成,写入后通过数据同步机制同步到所有业务单元。该方案适合读多写少的全局数据,可从根源上避免写冲突。
  • 方案二:全单元冗余模式:每个业务单元都部署完整的全局数据副本,写操作可在任意单元完成,写入后通过数据同步机制同步到所有单元。该方案适合读写都较为频繁的全局数据,需通过分布式锁、乐观锁等机制解决写冲突问题。

三、异地多活的核心命脉:跨地域数据同步

数据同步是异地多活架构中最难、最容易出问题的环节,直接决定了系统的数据一致性和业务可用性。

3.1 数据同步的核心矛盾:CAP定理的取舍

跨地域网络天然存在分区容错性(P),根据CAP定理,系统无法同时满足一致性(C)和可用性(A),必须做出取舍。

  • 异地多活场景下,99%的业务场景都会选择AP模型,优先保障业务可用性,采用最终一致性模型,仅在极少数核心金融场景采用有限的强一致性模型。
  • 跨地域强一致性需要通过Raft、Paxos等分布式共识算法实现,单次写入的RT通常会达到100ms以上,仅适合极小数据量、极高一致性要求的场景,不适合通用业务。

3.2 数据一致性模型的精准区分

  • 强一致性:写入操作成功后,所有单元的读操作都能立即返回最新的值。跨地域场景下性能损耗极大,仅适合账户余额、金融流水等核心场景。
  • 会话一致性:同一个用户的会话内,读操作一定能读到自己最新的写入结果。该模型可通过路由一致性天然实现,只要用户的流量永远路由到归属单元,就能保障会话一致性,是异地多活必须实现的基础模型。
  • 最终一致性:写入操作成功后,经过一定的同步延迟,所有单元的数据会达到一致状态。是异地多活的主流方案,兼顾性能和可用性,同步延迟通常可控制在几十到几百毫秒。

3.3 主流数据同步方案对比与落地

方案一:MySQL原生主主复制

基于MySQL原生的主从复制能力,实现双向数据同步,每个单元的MySQL节点既是主节点,也是其他节点的从节点,通过binlog实现数据双向同步。适用场景:单元化拆分彻底,每个单元仅写自己的分片数据,无跨单元写冲突的简单业务场景。

MySQL 8.0 双主复制核心配置: 上海节点my.cnf配置

[mysqld]

server-id=1

log-bin=mysql-bin

binlog_format=ROW

auto-increment-increment=2

auto-increment-offset=1

replicate-do-db=jam_demo

binlog-ignore-db=mysql

binlog-ignore-db=information_schema

binlog-ignore-db=performance_schema

gtid_mode=ON

enforce_gtid_consistency=ON

深圳节点my.cnf配置

[mysqld]

server-id=2

log-bin=mysql-bin

binlog_format=ROW

auto-increment-increment=2

auto-increment-offset=2

replicate-do-db=jam_demo

binlog-ignore-db=mysql

binlog-ignore-db=information_schema

binlog-ignore-db=performance_schema

gtid_mode=ON

enforce_gtid_consistency=ON

配置核心说明:

  • server-id必须全局唯一,不能重复
  • binlog_format必须使用ROW模式,STATEMENT模式跨地域同步会出现数据不一致问题
  • auto-increment-increment设置为单元总数,auto-increment-offset设置为本单元的偏移量,避免自增主键冲突
  • 开启GTID模式,方便故障切换和同步状态追踪

同步权限配置SQL:

CREATE USER 'repl'@'10.0.%.%' IDENTIFIED WITH mysql_native_password BY 'Repl@123456';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'repl'@'10.0.%.%';
FLUSH PRIVILEGES;

双向同步配置SQL(两个节点互相配置):

CHANGE MASTER TO MASTER_HOST='对端节点IP',MASTER_USER='repl',MASTER_PASSWORD='Repl@123456',MASTER_PORT=3306,MASTER_AUTO_POSITION=1;
START SLAVE;

该方案的优缺点:

  • 优点:配置简单,MySQL原生支持,同步延迟低,无需额外组件
  • 缺点:可控性差,出现数据冲突难以处理,不支持数据过滤和转换,无法应对复杂业务场景

方案二:基于binlog的增量同步(Canal+MQ)

这是生产环境最主流、可控性最高的同步方案。核心原理是:Canal伪装成MySQL的从节点,监听binlog的增量数据,将数据发送到RocketMQ/Kafka,每个单元的消费程序消费MQ中的数据,写入到本地数据库,实现跨地域数据同步。

核心优势:支持灵活的数据过滤、转换、幂等处理,可控性强,出现问题可回溯,支持异构数据源同步,可灵活控制同步的范围和速度。

Canal Server核心配置(conf/example/instance.properties):

canal.instance.master.address=本地MySQL地址:3306

canal.instance.dbUsername=canal

canal.instance.dbPassword=Canal@123456

canal.instance.defaultDatabaseName=jam_demo

canal.instance.filter.regex=jam_demo\\..*

canal.mq.topic=canal_binlog_sync

canal.mq.partitionHash=${table}.${pk}

3.4 数据同步的核心保障机制

  1. 幂等性保障:同步消息可能会重复投递,消费代码必须保证幂等性。常用方案是使用binlog文件名+偏移量作为唯一幂等键,或者使用数据主键+更新时间作为幂等标识,重复消息直接跳过处理。
  2. 顺序性保障:同一个主键的数据变更,必须保证顺序消费,否则会出现数据覆盖的问题。实现方式是通过MQ的哈希分区,将同一个主键的消息发送到同一个分区,保证单分区的顺序消费。
  3. 异常重试与死信处理:消费失败的消息,执行有限次数的重试,超过重试次数的消息进入死信队列,人工介入处理,避免同步流程阻塞。
  4. 数据一致性校验:建立定时校验机制,通过数据校验和、全量数据对比等方式,定期对比不同单元之间的数据差异,及时发现同步异常,保障数据一致性。

四、异地多活的神经中枢:流量调度体系

流量调度是异地多活架构的控制核心,负责将正确的流量路由到正确的单元,保障路由一致性、业务可用性和故障快速切换。

4.1 流量调度的全链路流程

4.2 核心路由实现

接入层路由

在Nginx/Ingress层面实现路由规则,根据请求中的用户ID计算归属单元,匹配本单元则转发到内部服务,不匹配则重定向到归属单元的接入地址。

Nginx核心路由配置(基于Lua脚本):

http {

   lua_shared_dict unit_route_map 10m;

   init_by_lua_block {

       local unit_route_map = ngx.shared.unit_route_map

       unit_route_map:set("0", "https://sh.demo.com")

       unit_route_map:set("1", "https://sz.demo.com")

       unit_route_map:set("2", "https://bj.demo.com")

   }

   server {

       listen 80;

       server_name demo.com;

       set $current_unit "0";

       location / {

           set $user_id $http_x_user_id;

           if ($user_id = "") {

               proxy_pass http://business_cluster;

               break;

           }

           set_by_lua_block $unit_code {

               local user_id = ngx.var.user_id

               local unit_count = 3

               return tostring(tonumber(user_id) % unit_count)

           }

           if ($unit_code != $current_unit) {

               set_by_lua_block $redirect_url {

                   local unit_route_map = ngx.shared.unit_route_map

                   local unit_code = ngx.var.unit_code

                   local uri = ngx.var.request_uri

                   return unit_route_map:get(unit_code) .. uri

               }

               return 302 $redirect_url;

           }

           proxy_pass http://business_cluster;

       }

   }

}

服务层路由校验

在业务服务层面,通过拦截器实现路由校验,防止非法的跨单元调用,保障路由一致性。

单元路由上下文持有类:

package com.jam.demo.common;

/**
* 单元上下文持有类
* @author ken
*/

public class UnitContextHolder {

   private static final ThreadLocal<String> UNIT_CODE_HOLDER = new ThreadLocal<>();
   private static final ThreadLocal<Long> USER_ID_HOLDER = new ThreadLocal<>();

   private UnitContextHolder() {}

   public static void setCurrentUnitCode(String unitCode) {
       UNIT_CODE_HOLDER.set(unitCode);
   }

   public static String getCurrentUnitCode() {
       return UNIT_CODE_HOLDER.get();
   }

   public static void setCurrentUserId(Long userId) {
       USER_ID_HOLDER.set(userId);
   }

   public static Long getCurrentUserId() {
       return USER_ID_HOLDER.get();
   }

   public static void clear() {
       UNIT_CODE_HOLDER.remove();
       USER_ID_HOLDER.remove();
   }
}

单元路由拦截器:

package com.jam.demo.config;

import com.jam.demo.common.UnitContextHolder;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.servlet.HandlerInterceptor;

/**
* 单元路由拦截器
* @author ken
*/

@Slf4j
@Component
public class UnitRouteInterceptor implements HandlerInterceptor {

   @Value("${multi.active.current.unit.code}")
   private String currentUnitCode;

   private static final String USER_ID_HEADER = "X-User-Id";
   private static final String UNIT_CODE_HEADER = "X-Unit-Code";

   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
       String userId = request.getHeader(USER_ID_HEADER);
       if (!StringUtils.hasText(userId)) {
           return true;
       }
       String requestUnitCode = request.getHeader(UNIT_CODE_HEADER);
       if (StringUtils.hasText(requestUnitCode) && !currentUnitCode.equals(requestUnitCode)) {
           response.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
           response.setHeader("Location", getUnitAddress(requestUnitCode, request));
           return false;
       }
       UnitContextHolder.setCurrentUnitCode(currentUnitCode);
       UnitContextHolder.setCurrentUserId(Long.valueOf(userId));
       return true;
   }

   @Override
   public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
       UnitContextHolder.clear();
   }

   private String getUnitAddress(String unitCode, HttpServletRequest request) {
       return "https://" + unitCode.toLowerCase() + ".demo.com" + request.getRequestURI();
   }
}

WebMvc配置类,注册拦截器:

package com.jam.demo.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
* WebMvc配置类
* @author ken
*/

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

   private final UnitRouteInterceptor unitRouteInterceptor;

   public WebMvcConfig(UnitRouteInterceptor unitRouteInterceptor) {
       this.unitRouteInterceptor = unitRouteInterceptor;
   }

   @Override
   public void addInterceptors(InterceptorRegistry registry) {
       registry.addInterceptor(unitRouteInterceptor)
               .addPathPatterns("/**")
               .excludePathPatterns("/v3/api-docs/**", "/swagger-ui/**", "/swagger-ui.html");
   }
}

4.3 全局唯一ID设计

多单元同时写入数据时,必须使用全局唯一ID,避免主键冲突,禁止使用数据库自增主键。这里实现基于号段模式的分布式ID生成器,性能高、无时钟回拨问题、适配异地多活场景。

MySQL号段表结构:

CREATE TABLE IF NOT EXISTS t_id_segment (
   id BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键ID',
   biz_type VARCHAR(32) NOT NULL COMMENT '业务类型',
   current_max_id BIGINT NOT NULL DEFAULT 0 COMMENT '当前最大ID',
   step INT NOT NULL DEFAULT 1000 COMMENT '号段步长',
   create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
   PRIMARY KEY (id),
   UNIQUE KEY uk_biz_type (biz_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分布式ID号段表';

INSERT INTO t_id_segment (biz_type, current_max_id, step) VALUES ('user', 0, 1000), ('order', 0, 1000) ON DUPLICATE KEY UPDATE update_time = CURRENT_TIMESTAMP;

号段模式分布式ID生成器实现:

package com.jam.demo.common;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.jam.demo.entity.IdSegment;
import com.jam.demo.mapper.IdSegmentMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
* 号段模式分布式ID生成器
* @author ken
*/

@Slf4j
@Component
public class SegmentIdGenerator {

   private final IdSegmentMapper idSegmentMapper;

   private final PlatformTransactionManager transactionManager;

   private final ConcurrentHashMap<String, IdSegmentHolder> segmentHolderMap = new ConcurrentHashMap<>();

   private final ReentrantLock lock = new ReentrantLock();

   public SegmentIdGenerator(IdSegmentMapper idSegmentMapper, PlatformTransactionManager transactionManager) {
       this.idSegmentMapper = idSegmentMapper;
       this.transactionManager = transactionManager;
   }

   /**
    * 生成分布式ID
    * @param bizType 业务类型
    * @return 全局唯一ID
    */

   public Long generateId(String bizType) {
       if (!StringUtils.hasText(bizType)) {
           throw new IllegalArgumentException("业务类型不能为空");
       }
       IdSegmentHolder holder = segmentHolderMap.get(bizType);
       if (ObjectUtils.isEmpty(holder) || !holder.usable()) {
           lock.lock();
           try {
               holder = segmentHolderMap.get(bizType);
               if (ObjectUtils.isEmpty(holder) || !holder.usable()) {
                   holder = refreshSegment(bizType);
                   segmentHolderMap.put(bizType, holder);
               }
           } finally {
               lock.unlock();
           }
       }
       return holder.nextId();
   }

   /**
    * 刷新号段
    * @param bizType 业务类型
    * @return 号段持有对象
    */

   private IdSegmentHolder refreshSegment(String bizType) {
       DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
       definition.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
       TransactionStatus status = transactionManager.getTransaction(definition);
       try {
           LambdaUpdateWrapper<IdSegment> updateWrapper = new LambdaUpdateWrapper<IdSegment>()
                   .eq(IdSegment::getBizType, bizType)
                   .setSql("current_max_id = current_max_id + step");
           int updateCount = idSegmentMapper.update(null, updateWrapper);
           if (updateCount == 0) {
               throw new RuntimeException("号段更新失败,业务类型:" + bizType);
           }
           IdSegment idSegment = idSegmentMapper.selectOne(new LambdaQueryWrapper<IdSegment>().eq(IdSegment::getBizType, bizType));
           transactionManager.commit(status);
           long maxId = idSegment.getCurrentMaxId();
           long step = idSegment.getStep();
           return new IdSegmentHolder(maxId - step, maxId);
       } catch (Exception e) {
           transactionManager.rollback(status);
           log.error("号段刷新失败,业务类型:{}", bizType, e);
           throw new RuntimeException("号段刷新失败", e);
       }
   }

   /**
    * 号段持有对象
    */

   private static class IdSegmentHolder {
       private final AtomicLong currentId;
       private final long maxId;

       public IdSegmentHolder(long minId, long maxId) {
           this.currentId = new AtomicLong(minId);
           this.maxId = maxId;
       }

       public boolean usable() {
           return currentId.get() < maxId;
       }

       public long nextId() {
           return currentId.incrementAndGet();
       }
   }
}

4.4 流量调度的核心场景

  1. 正常流量调度:按照地域就近接入原则,将用户流量调度到最近的地域单元,同时通过路由规则保障同一个用户的流量永远路由到归属单元,兼顾访问延迟和路由一致性。
  2. 故障切流:当某个单元出现故障时,GSLB自动将该单元的流量切换到其他正常单元,同时更新全局路由规则,将故障单元的用户临时路由到备用单元。切流需采用灰度策略,先切10%流量验证,再逐步全量,避免引发雪崩。
  3. 灰度发布:将新版本的服务部署在指定单元,然后将少量用户的流量切到该单元,验证新版本的稳定性,验证通过后再全量发布到所有单元,大幅降低发布风险。

五、业务实现

5.1 项目基础配置

pom.xml依赖配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>
   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.4</version>
       <relativePath/>
   </parent>
   <groupId>com.jam</groupId>
   <artifactId>multi-active-demo</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>multi-active-demo</name>
   <description>异地多活架构demo</description>
   <properties>
       <java.version>17</java.version>
       <mybatis-plus.version>3.5.7</mybatis-plus.version>
       <fastjson2.version>2.0.52</fastjson2.version>
       <rocketmq.version>2.2.5</rocketmq.version>
       <canal.version>1.1.7</canal.version>
       <guava.version>33.1.0-jre</guava.version>
   </properties>
   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-jdbc</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-validation</artifactId>
       </dependency>
       <dependency>
           <groupId>com.baomidou</groupId>
           <artifactId>mybatis-plus-boot-starter</artifactId>
           <version>${mybatis-plus.version}</version>
       </dependency>
       <dependency>
           <groupId>com.mysql</groupId>
           <artifactId>mysql-connector-j</artifactId>
           <scope>runtime</scope>
       </dependency>
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>1.18.30</version>
           <scope>provided</scope>
       </dependency>
       <dependency>
           <groupId>com.alibaba.fastjson2</groupId>
           <artifactId>fastjson2</artifactId>
           <version>${fastjson2.version}</version>
       </dependency>
       <dependency>
           <groupId>org.springdoc</groupId>
           <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
           <version>2.5.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.rocketmq</groupId>
           <artifactId>rocketmq-spring-boot-starter</artifactId>
           <version>${rocketmq.version}</version>
       </dependency>
       <dependency>
           <groupId>com.alibaba.otter</groupId>
           <artifactId>canal.client</artifactId>
           <version>${canal.version}</version>
       </dependency>
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>${guava.version}</version>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>
   </dependencies>
   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
               <configuration>
                   <excludes>
                       <exclude>
                           <groupId>org.projectlombok</groupId>
                           <artifactId>lombok</artifactId>
                       </exclude>
                   </excludes>
               </configuration>
           </plugin>
       </plugins>
   </build>
</project>

application.yml配置文件:

spring:
 application:
   name: multi-active-demo
 datasource:
   url: jdbc:mysql://127.0.0.1:3306/jam_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
   username: root
   password: Root@123456
   driver-class-name: com.mysql.cj.jdbc.Driver
 jackson:
   default-property-inclusion: non_null
   time-zone: Asia/Shanghai

rocketmq:
 name-server: 127.0.0.1:9876
 producer:
   group: multi-active-producer-group

mybatis-plus:
 configuration:
   map-underscore-to-camel-case: true
 global-config:
   db-config:
     id-type: input

springdoc:
 swagger-ui:
   path: /swagger-ui.html
 api-docs:
   path: /v3/api-docs
 packages-to-scan: com.jam.demo.controller

multi:
 active:
   current:
     unit:
       code: SH-01

server:
 port: 8080

5.2 核心业务表结构

CREATE DATABASE IF NOT EXISTS jam_demo DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE jam_demo;

CREATE TABLE IF NOT EXISTS t_user (
   id BIGINT NOT NULL COMMENT '用户ID',
   username VARCHAR(64) NOT NULL COMMENT '用户名',
   phone VARCHAR(11) NOT NULL COMMENT '手机号',
   unit_code VARCHAR(16) NOT NULL COMMENT '归属单元编码',
   create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
   PRIMARY KEY (id),
   UNIQUE KEY uk_username (username),
   UNIQUE KEY uk_phone (phone),
   KEY idx_unit_code (unit_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户表';

CREATE TABLE IF NOT EXISTS t_order (
   id BIGINT NOT NULL COMMENT '订单ID',
   user_id BIGINT NOT NULL COMMENT '用户ID',
   order_amount DECIMAL(12,2) NOT NULL COMMENT '订单金额',
   order_status TINYINT NOT NULL DEFAULT 0 COMMENT '订单状态:0-待支付,1-已支付,2-已取消',
   unit_code VARCHAR(16) NOT NULL COMMENT '归属单元编码',
   create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
   PRIMARY KEY (id),
   KEY idx_user_id (user_id),
   KEY idx_unit_code (unit_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='订单表';

5.3 核心业务代码实现

用户实体类:

package com.jam.demo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.time.LocalDateTime;

/**
* 用户实体类
* @author ken
*/

@Data
@TableName("t_user")
@Schema(description = "用户实体")
public class User {

   @TableId(type = IdType.INPUT)
   @Schema(description = "用户ID", example = "1000000000001")
   private Long id;

   @Schema(description = "用户名", example = "jam_user")
   private String username;

   @Schema(description = "手机号", example = "13800138000")
   private String phone;

   @Schema(description = "归属单元编码", example = "SH-01")
   private String unitCode;

   @Schema(description = "创建时间")
   private LocalDateTime createTime;

   @Schema(description = "更新时间")
   private LocalDateTime updateTime;
}

用户Mapper接口:

package com.jam.demo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.User;
import org.apache.ibatis.annotations.Mapper;

/**
* 用户Mapper接口
* @author ken
*/

@Mapper
public interface UserMapper extends BaseMapper<User> {
}

用户服务实现类:

package com.jam.demo.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.common.SegmentIdGenerator;
import com.jam.demo.common.UnitContextHolder;
import com.jam.demo.entity.User;
import com.jam.demo.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import java.util.List;

/**
* 用户服务实现类
* @author ken
*/

@Slf4j
@Service
public class UserService {

   private final UserMapper userMapper;

   private final SegmentIdGenerator segmentIdGenerator;

   private final PlatformTransactionManager transactionManager;

   public UserService(UserMapper userMapper, SegmentIdGenerator segmentIdGenerator, PlatformTransactionManager transactionManager) {
       this.userMapper = userMapper;
       this.segmentIdGenerator = segmentIdGenerator;
       this.transactionManager = transactionManager;
   }

   /**
    * 创建用户
    * @param user 用户信息
    * @return 创建成功的用户ID
    */

   public Long createUser(User user) {
       if (ObjectUtils.isEmpty(user)) {
           throw new IllegalArgumentException("用户信息不能为空");
       }
       if (!StringUtils.hasText(user.getUsername())) {
           throw new IllegalArgumentException("用户名不能为空");
       }
       if (!StringUtils.hasText(user.getPhone())) {
           throw new IllegalArgumentException("手机号不能为空");
       }
       DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
       TransactionStatus status = transactionManager.getTransaction(definition);
       try {
           Long userId = segmentIdGenerator.generateId("user");
           user.setId(userId);
           user.setUnitCode(UnitContextHolder.getCurrentUnitCode());
           userMapper.insert(user);
           transactionManager.commit(status);
           log.info("用户创建成功,用户ID:{},归属单元:{}", userId, user.getUnitCode());
           return userId;
       } catch (Exception e) {
           transactionManager.rollback(status);
           log.error("用户创建失败,用户名:{}", user.getUsername(), e);
           throw new RuntimeException("用户创建失败", e);
       }
   }

   /**
    * 根据用户ID查询用户
    * @param userId 用户ID
    * @return 用户信息
    */

   public User getUserById(Long userId) {
       if (ObjectUtils.isEmpty(userId) || userId <= 0) {
           throw new IllegalArgumentException("用户ID不能为空");
       }
       return userMapper.selectById(userId);
   }

   /**
    * 根据单元编码查询用户列表
    * @param unitCode 单元编码
    * @return 用户列表
    */

   public List<User> getUserListByUnitCode(String unitCode) {
       if (!StringUtils.hasText(unitCode)) {
           throw new IllegalArgumentException("单元编码不能为空");
       }
       List<User> userList = userMapper.selectList(new LambdaQueryWrapper<User>().eq(User::getUnitCode, unitCode));
       if (CollectionUtils.isEmpty(userList)) {
           return List.of();
       }
       return userList;
   }
}

用户控制器:

package com.jam.demo.controller;

import com.jam.demo.entity.User;
import com.jam.demo.service.UserService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/**
* 用户控制器
* @author ken
*/

@RestController
@RequestMapping("/user")
@Tag(name = "用户管理", description = "用户相关操作接口")
public class UserController {

   private final UserService userService;

   public UserController(UserService userService) {
       this.userService = userService;
   }

   @PostMapping("/create")
   @Operation(summary = "创建用户", description = "创建新用户,返回用户ID")
   public ResponseEntity<Long> createUser(@RequestBody User user) {
       Long userId = userService.createUser(user);
       return ResponseEntity.ok(userId);
   }

   @GetMapping("/{userId}")
   @Operation(summary = "查询用户", description = "根据用户ID查询用户信息")
   public ResponseEntity<User> getUserById(@PathVariable @Parameter(description = "用户ID") Long userId) {
       User user = userService.getUserById(userId);
       return ResponseEntity.ok(user);
   }

   @GetMapping("/list")
   @Operation(summary = "查询单元用户列表", description = "根据单元编码查询用户列表")
   public ResponseEntity<List<User>> getUserListByUnitCode(@RequestParam @Parameter(description = "单元编码") String unitCode) {
       List<User> userList = userService.getUserListByUnitCode(unitCode);
       return ResponseEntity.ok(userList);
   }
}

binlog同步消费者实现:

package com.jam.demo.mq;

import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.jam.demo.entity.User;
import com.jam.demo.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
* Canal binlog同步消费者
* @author ken
*/

@Slf4j
@Component
@RocketMQMessageListener(topic = "canal_binlog_sync", consumerGroup = "canal_sync_consumer_group")
public class CanalBinlogConsumer implements RocketMQListener<String> {

   private final UserMapper userMapper;

   private final PlatformTransactionManager transactionManager;

   private final ConcurrentHashMap<String, Long> processedMessageMap = new ConcurrentHashMap<>();

   public CanalBinlogConsumer(UserMapper userMapper, PlatformTransactionManager transactionManager) {
       this.userMapper = userMapper;
       this.transactionManager = transactionManager;
   }

   @Override
   public void onMessage(String message) {
       try {
           CanalEntry.Entry entry = JSON.parseObject(message, CanalEntry.Entry.class);
           if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
               return;
           }
           String messageKey = entry.getHeader().getLogfileName() + "_" + entry.getHeader().getLogfileOffset();
           if (processedMessageMap.containsKey(messageKey)) {
               log.info("重复消息,跳过处理,messageKey:{}", messageKey);
               return;
           }
           CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
           String tableName = entry.getHeader().getTableName();
           CanalEntry.EventType eventType = rowChange.getEventType();
           if ("t_user".equals(tableName)) {
               handleUserDataChange(eventType, rowChange.getRowDatasList());
           }
           processedMessageMap.put(messageKey, System.currentTimeMillis());
           processedMessageMap.entrySet().removeIf(entry -> System.currentTimeMillis() - entry.getValue() > 24 * 60 * 60 * 1000);
       } catch (Exception e) {
           log.error("binlog消息处理失败", e);
           throw new RuntimeException("binlog消息处理失败", e);
       }
   }

   /**
    * 处理用户表数据变更
    * @param eventType 事件类型
    * @param rowDatas 行数据列表
    */

   private void handleUserDataChange(CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDatas) {
       if (ObjectUtils.isEmpty(rowDatas)) {
           return;
       }
       DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
       TransactionStatus status = transactionManager.getTransaction(definition);
       try {
           for (CanalEntry.RowData rowData : rowDatas) {
               User user = convertRowDataToUser(rowData.getAfterColumnsList());
               switch (eventType) {
                   case INSERT -> userMapper.insert(user);
                   case UPDATE -> userMapper.updateById(user);
                   case DELETE -> userMapper.deleteById(user.getId());
                   default -> log.info("不支持的事件类型:{}", eventType);
               }
           }
           transactionManager.commit(status);
       } catch (Exception e) {
           transactionManager.rollback(status);
           throw e;
       }
   }

   /**
    * 行数据转换为用户实体
    * @param columns 列数据列表
    * @return 用户实体
    */

   private User convertRowDataToUser(List<CanalEntry.Column> columns) {
       User user = new User();
       for (CanalEntry.Column column : columns) {
           switch (column.getName()) {
               case "id" -> user.setId(Long.valueOf(column.getValue()));
               case "username" -> user.setUsername(column.getValue());
               case "phone" -> user.setPhone(column.getValue());
               case "unit_code" -> user.setUnitCode(column.getValue());
               case "create_time" -> user.setCreateTime(java.time.LocalDateTime.parse(column.getValue()));
               case "update_time" -> user.setUpdateTime(java.time.LocalDateTime.parse(column.getValue()));
               default -> {}
           }
       }
       return user;
   }
}

六、落地踩坑指南与最佳实践

6.1 核心踩坑指南

  1. 路由一致性被破坏:用户流量被错误转发到非归属单元,导致查不到数据。解决方案是建立全链路路由校验机制,从接入层到服务层都要执行路由校验,禁止跨单元写操作。
  2. 数据同步延迟引发业务异常:用户写入数据后,同步未完成时发生切流,导致新单元查不到数据。解决方案是切流前先完成数据同步校验,确保数据完全同步后再执行切流,切流过程中先禁写、再切流、最后放开写。
  3. 主键冲突问题:使用数据库自增主键,多单元同时写入导致主键冲突。解决方案是必须使用分布式ID,禁止使用数据库自增主键。
  4. 跨单元调用导致性能雪崩:单元化拆分不彻底,服务之间存在大量跨单元调用,导致接口RT飙升。解决方案是严格遵守数据封闭性原则,实现单元内业务闭环,杜绝跨单元业务调用。
  5. 同步幂等性缺失:消息重复消费导致数据被错误覆盖。解决方案是使用binlog文件名+偏移量作为唯一幂等键,确保重复消息不会被重复处理。

6.2 落地最佳实践

  1. 渐进式拆分:单元化拆分先从核心业务开始,逐步扩展到非核心业务,不要一次性全量拆分,降低落地风险。
  2. 优先选择用户ID分片:分片策略优先选择用户ID,保证数据分布均匀、路由固定,适配绝大多数业务场景。
  3. 数据同步优先选择Canal+MQ方案:该方案可控性高、扩展性强,可灵活应对复杂的业务同步需求。
  4. 灰度切流机制:所有流量切换操作都要采用灰度策略,先切1%、10%、50%的流量,验证无问题后再全量切换,避免引发雪崩。
  5. 全链路监控体系:建立覆盖数据同步延迟、单元健康度、流量分布、请求成功率、接口RT的全链路监控体系,及时发现和处理异常。
  6. 定期故障演练:每季度至少执行一次单元故障演练,模拟单元故障场景,验证切流能力和业务恢复能力,确保容灾预案有效。

总结

异地多活不是一个简单的技术组件,而是一套完整的架构体系,涵盖了单元化设计、数据同步、流量调度、监控运维、故障容灾等多个环节。只有严格遵守单元化三大黄金原则,做好数据同步的一致性保障,建立完善的流量调度体系,才能真正实现业务的高可用,抵御从机房级到城市级的各类故障,保障业务的连续稳定运行。

目录
相关文章
|
2天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
10252 35
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
14天前
|
人工智能 安全 Linux
【OpenClaw保姆级图文教程】阿里云/本地部署集成模型Ollama/Qwen3.5/百炼 API 步骤流程及避坑指南
2026年,AI代理工具的部署逻辑已从“单一云端依赖”转向“云端+本地双轨模式”。OpenClaw(曾用名Clawdbot)作为开源AI代理框架,既支持对接阿里云百炼等云端免费API,也能通过Ollama部署本地大模型,完美解决两类核心需求:一是担心云端API泄露核心数据的隐私安全诉求;二是频繁调用导致token消耗过高的成本控制需求。
5943 14
|
22天前
|
人工智能 JavaScript Ubuntu
5分钟上手龙虾AI!OpenClaw部署(阿里云+本地)+ 免费多模型配置保姆级教程(MiniMax、Claude、阿里云百炼)
OpenClaw(昵称“龙虾AI”)作为2026年热门的开源个人AI助手,由PSPDFKit创始人Peter Steinberger开发,核心优势在于“真正执行任务”——不仅能聊天互动,还能自动处理邮件、管理日程、订机票、写代码等,且所有数据本地处理,隐私完全可控。它支持接入MiniMax、Claude、GPT等多类大模型,兼容微信、Telegram、飞书等主流聊天工具,搭配100+可扩展技能,成为兼顾实用性与隐私性的AI工具首选。
23220 120
|
8天前
|
人工智能 JavaScript API
解放双手!OpenClaw Agent Browser全攻略(阿里云+本地部署+免费API+网页自动化场景落地)
“让AI聊聊天、写代码不难,难的是让它自己打开网页、填表单、查数据”——2026年,无数OpenClaw用户被这个痛点困扰。参考文章直击核心:当AI只能“纸上谈兵”,无法实际操控浏览器,就永远成不了真正的“数字员工”。而Agent Browser技能的出现,彻底打破了这一壁垒——它给OpenClaw装上“上网的手和眼睛”,让AI能像真人一样打开网页、点击按钮、填写表单、提取数据,24小时不间断完成网页自动化任务。
1960 4