【转】微服务MySQL分库分表数据到MongoDB同步方案

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 近年来,微服务概念持续火热,网络上针对微服务和单体架构的讨论也是越来越多,面对日益增长的业务需求是,很多公司做技术架构升级时优先选用微服务方式。我所在公司也是选的这个方向来升级技术架构,以支撑更大访问量和更方便的业务扩展。

需求背景

近年来,微服务概念持续火热,网络上针对微服务和单体架构的讨论也是越来越多,面对日益增长的业务需求是,很多公司做技术架构升级时优先选用微服务方式。我所在公司也是选的这个方向来升级技术架构,以支撑更大访问量和更方便的业务扩展。

发现问题

微服务拆分主要分两种方式:拆分业务系统不拆分数据库,拆分业务系统拆分库。如果数据规模小的话大可不必拆分数据库,因为拆分数据看必将面对多维度数据查询,跨进程之间的事务等问题。而我所在公司随着业务发展单数据库实例已经不能满足业务需要,所以选择了拆分业务系统同时拆分数据库的模式,所以也面临着以上的问题。本文主要介绍多维度数据实时查询解决方案。当前系统架构和存储结构如下:

image

解决思路

  • 要对多数据库数据进行查询,首先就需要将数据库同步到一起以方便查询

  • 为了满足大数据量数据需求,所以优先选择NOSQL数据库做同步库

  • NOSQL数据库基本无法进行关联查询,所以需要将关系数据进行拼接操作,转换成非关系型数据

  • 业务多维度查询需要实时性,所以需要选择NOSQL中实时性相对比较好的数据库:MongoDB

根据以上思路,总结数据整合架构如下图所示:

image

解决方案

目前网上一些数据同步案例分两种:MQ消息同步和binlog数据读取同步

先说MQ消息同步,该同步方式我所在公司试用过一段时间,发现以下问题:

  • 数据围绕业务进行,对业务关键性数据操作发送MQ消息,对业务系统依赖性比较高

  • 对于数据库中存量数据需要单独处理

  • 对于工具表还需要单独维护同步

  • 每次新增数据表都需要重新添加MQ逻辑

考虑到以上问题,用MQ方式同步数据并不是最优解决办法

使用binlog 数据读取方式目前有一些成熟方案,比如tungsten replicator,但这些同步工具只能实现数据1:1复制,数据复制过程自定义逻辑添加比较麻烦,不支持分库分表数据归集操作。综上所述,最优方案应该是读取后binlog后自行处理后续数据逻辑。目前binlog读取binlog工具中最成熟的方案应该就是alibaba开源的canal了。

canal

canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS、阿里巴巴TDDL 二级索引、小表复制. 都是基于canal做的,应用广泛。
canal原理相对比较简单:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)

  • canal解析binary log对象(原始为byte流)

canal介绍: https://github.com/alibaba/canal/wiki

我使用的是canal的HA模式,由zookeeper选举可用实例,每个数据库一个instance,服务端配置如下:

目录:

conf
    database1
        -instance.properties
    database2
        -instance.properties
    canal.properties

instance.properties

canal.instance.mysql.slaveId = 1001
canal.instance.master.address = X.X.X.X:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .*\\..*
canal.instance.filter.black.regex =

canal.properties

canal.id= 1
canal.ip=X.X.X.X
canal.port= 11111
canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
canal.zookeeper.flush.period = 1000
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024 
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.detecting.enable = true
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
canal.instance.transaction.size =  1024
canal.instance.fallbackIntervalInSeconds = 60
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation = false
canal.destinations= example,p4-test
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = spring 
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

部署数据流如下:

image

tip:
虽然canal同时支持mixed和row类型的binlog日志,但是获取行数据时如果是mixed类型的日志则获取不到表名,所以本方案暂只支持row格式的binlog

数据同步

创建canal client应用订阅canal读取的binlog数据

1.开启多instance 订阅,订阅多个instance

public void initCanalStart() {
    List<String> destinations = canalProperties.getDestination();
    final List<CanalClient> canalClientList = new ArrayList<>();
    if (destinations != null && destinations.size() > 0) {
        for (String destination : destinations) {
            // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
            CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");
            CanalClient client = new CanalClient(destination, connector);
            canalClientList.add(client);
            client.start();
        }
    }
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            try {
                logger.info("## stop the canal client");
                for (CanalClient canalClient : canalClientList) {
                    canalClient.stop();
                }
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal:", e);
            } finally {
                logger.info("## canal client is down.");
            }
        }
    });
}

订阅消息处理

private void process() {
    int batchSize = 5 * 1024;
    while (running) {
        try {
            MDC.put("destination", destination);
            connector.connect();
            connector.subscribe();
            while (running) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId != -1 && size > 0) {
                    saveEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } catch (Exception e) {
            logger.error("process error!", e);
        } finally {
            connector.disconnect();
            MDC.remove("destination");
        }
    }
}

根据数据库事件处理消息,过滤消息列表,对数据变动进行处理,用到信息为:

  • insert :schemaName,tableName,beforeColumnsList

  • update :schemaName,tableName,afterColumnsList

  • delete :schemaName,tableName,afterColumnsList

RowChange rowChage = null;
    try {
        rowChage = RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
    }
    EventType eventType = rowChage.getEventType();
    logger.info(row_format,
            entry.getHeader().getLogfileName(),
            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
            entry.getHeader().getTableName(), eventType,
            String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));
    if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
        logger.info(" sql ----> " + rowChage.getSql());
        continue;
    }
    DataService dataService = SpringUtil.getBean(DataService.class);
    for (RowData rowData : rowChage.getRowDatasList()) {
        if (eventType == EventType.DELETE) {
            dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        } else if (eventType == EventType.INSERT) {
            dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        } else if (eventType == EventType.UPDATE) {
            dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        } else {
            logger.info("未知数据变动类型:{}", eventType);
        }
    }
}

ColumnsList转换成MongoTemplate 可用的数据类:DBObject,顺便做下数据类型转换

public static DBObject columnToJson(List<CanalEntry.Column> columns) {
    DBObject obj = new BasicDBObject();
    try {
        for (CanalEntry.Column column : columns) {
            String mysqlType = column.getMysqlType();
            //int类型,长度11以下为Integer,以上为long
            if (mysqlType.startsWith("int")) {
                int lenBegin = mysqlType.indexOf('(');
                int lenEnd = mysqlType.indexOf(')');
                if (lenBegin > 0 && lenEnd > 0) {
                    int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd));
                    if (length > 10) {
                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
                        continue;
                    }
                }
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue()));
            } else if (mysqlType.startsWith("bigint")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
            } else if (mysqlType.startsWith("decimal")) {
                int lenBegin = mysqlType.indexOf('(');
                int lenCenter = mysqlType.indexOf(',');
                int lenEnd = mysqlType.indexOf(')');
                if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) {
                    int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd));
                    if (length == 0) {
                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
                        continue;
                    }
                }
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));
            } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));
            } else if (mysqlType.equals("date")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));
            } else if (mysqlType.equals("time")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));
            } else {
                obj.put(column.getName(), column.getValue());
            }
        }
    } catch (ParseException e) {
        e.printStackTrace();
    }
    return obj;
}

tip:
DBObject对象如果同时用于保存原始数据和组合数据或其他数据,使用时应该深度拷贝对象生成副本,然后使用副本

数据拼接

我们获取了数据库数据后做拼接操作,比如两张用户表:

user_info:{id,user_no,user_name,user_password}
user_other_info:{id,user_no,idcard,realname}

拼接后mongo数据为:

user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})

接收到的数据信息很多,如何才能简单的触发数据拼接操作呢?

先看我们能获取的信息:schemaName,tableName,DBObject,Event(insert,update,delete)

将这些信息标识拼接起来看看:/schemaName/tableName/Event(DBObject),没错,就是一个标准的restful链接。只要我们实现一个简单的springMVC 就能自动获取需要的数据信息进行拼接操作。

先实现@Controller,定义名称为Schema,value对应schemaName

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public  @interface Schema {
 String value() default "";
}

然后实现@RequestMapping,定义名称为Table,直接使用Canal中的EventType 对应RequestMethod

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public  @interface Table {
    String value() default "";
    CanalEntry.EventType[] event() default {};
}

然后创建springUtil,实现接口ApplicationContextAware,应用启动 加载的时候初始化两个Map:intanceMap,handlerMap

private static ApplicationContext applicationContext = null;
//库名和数据处理Bean映射Map
private static Map<String, Object> instanceMap = new HashMap<String, Object>();
//路劲和数据处理Method映射Map
private static Map<String, Method> handlerMap = new HashMap<String, Method>();
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
    if (SpringUtil.applicationContext == null) {
        SpringUtil.applicationContext = applicationContext;
        //初始化instanceMap数据
        instanceMap();
        //初始化handlerMap数据
        handlerMap();
    }
}
private void instanceMap() {
    Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Schema.class);
    for (Object bean : beans.values()) {
        Class<?> clazz = bean.getClass();
        Object instance = applicationContext.getBean(clazz);
        Schema schema = clazz.getAnnotation(Schema.class);
        String key = schema.value();
        instanceMap.put(key, instance);
        logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName());
    }
}
private void handlerMap() {
    if (instanceMap.size() <= 0)
        return;
    for (Map.Entry<String, Object> entry : instanceMap.entrySet()) {
        if (entry.getValue().getClass().isAnnotationPresent(Schema.class)) {
            Schema schema = entry.getValue().getClass().getAnnotation(Schema.class);
            String schemeName = schema.value();
            Method[] methods = entry.getValue().getClass().getMethods();
            for (Method method : methods) {
                if (method.isAnnotationPresent(Table.class)) {
                    Table table = method.getAnnotation(Table.class);
                    String tName = table.value();
                    CanalEntry.EventType[] events = table.event();
                    //未标明数据事件类型的方法不做映射
                    if (events.length < 1) {
                        continue;
                    }
                    //同一个方法可以映射多张表
                    for (int i = 0; i < events.length; i++) {
                        String path = "/" + schemeName + "/" + tName + "/" + events[i].getNumber();
                        handlerMap.put(path, method);
                        logger.info("handlerMap [{}:{}]", path, method.getName());
                    }
                } else {
                    continue;
                }
            }
        } else {
            continue;
        }
    }
}

调用方法:

public static void doEvent(String path, DBObject obj) throws Exception {
    String[] pathArray = path.split("/");
    if (pathArray.length != 4) {
        logger.info("path 格式不正确:{}", path);
        return;
    }
    Method method = handlerMap.get(path);
    Object schema = instanceMap.get(pathArray[1]);
    //查找不到映射Bean和Method不做处理
    if (method == null || schema == null) {
        return;
    }
    try {
        long begin = System.currentTimeMillis();
        logger.info("integrate data:{},{}", path, obj);
        method.invoke(schema, new Object[]{obj});
        logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin);
    } catch (Exception e) {
        logger.error("调用组合逻辑异常", e);
        throw new Exception(e.getCause());
    }
}

数据拼接消息处理:

@Schema("demo_user")
public class UserService {
    @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE})
    public void saveUser_UserInfo(DBObject userInfo) {
        String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString();
        DBCollection collection = completeMongoTemplate.getCollection("user");
        DBObject queryObject = new BasicDBObject("user_no", userNo);
        DBObject user = collection.findOne(queryObject);
        if (user == null) {
            user = new BasicDBObject();
            user.put("user_no", userNo);
            user.put("userInfo", userInfo);
            collection.insert(user);
        } else {
            DBObject updateObj = new BasicDBObject("userInfo", userInfo);
            DBObject update = new BasicDBObject("$set", updateObj);
            collection.update(queryObject, update);
        }
    }
}

示例源码

https://github.com/zhangtr/canal-mongo

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
1月前
|
存储 SQL 关系型数据库
Mysql高可用架构方案
本文阐述了Mysql高可用架构方案,介绍了 主从模式,MHA模式,MMM模式,MGR模式 方案的实现方式,没有哪个方案是完美的,开发人员在选择何种方案应用到项目中也没有标准答案,合适的才是最好的。
162 3
Mysql高可用架构方案
|
2月前
|
存储 NoSQL MongoDB
数据的存储--MongoDB文档存储(二)
数据的存储--MongoDB文档存储(二)
77 2
|
2月前
|
存储 关系型数据库 MySQL
一个项目用5款数据库?MySQL、PostgreSQL、ClickHouse、MongoDB区别,适用场景
一个项目用5款数据库?MySQL、PostgreSQL、ClickHouse、MongoDB——特点、性能、扩展性、安全性、适用场景比较
|
3月前
|
NoSQL 关系型数据库 MySQL
微服务架构下的数据库选择:MySQL、PostgreSQL 还是 NoSQL?
在微服务架构中,数据库的选择至关重要。不同类型的数据库适用于不同的需求和场景。在本文章中,我们将深入探讨传统的关系型数据库(如 MySQL 和 PostgreSQL)与现代 NoSQL 数据库的优劣势,并分析在微服务架构下的最佳实践。
|
1天前
|
存储 缓存 关系型数据库
【MySQL进阶篇】存储引擎(MySQL体系结构、InnoDB、MyISAM、Memory区别及特点、存储引擎的选择方案)
MySQL的存储引擎是其核心组件之一,负责数据的存储、索引和检索。不同的存储引擎具有不同的功能和特性,可以根据业务需求 选择合适的引擎。本文详细介绍了MySQL体系结构、InnoDB、MyISAM、Memory区别及特点、存储引擎的选择方案。
【MySQL进阶篇】存储引擎(MySQL体系结构、InnoDB、MyISAM、Memory区别及特点、存储引擎的选择方案)
|
10天前
|
SQL 关系型数据库 MySQL
数据库数据恢复—Mysql数据库表记录丢失的数据恢复方案
Mysql数据库故障: Mysql数据库表记录丢失。 Mysql数据库故障表现: 1、Mysql数据库表中无任何数据或只有部分数据。 2、客户端无法查询到完整的信息。
|
1月前
|
关系型数据库 MySQL
mysql 5.7.x版本查看某张表、库的大小 思路方案说明
mysql 5.7.x版本查看某张表、库的大小 思路方案说明
73 5
|
1月前
|
关系型数据库 MySQL
mysql 5.7.x版本查看某张表、库的大小 思路方案说明
mysql 5.7.x版本查看某张表、库的大小 思路方案说明
41 1
|
3月前
|
存储 SQL 关系型数据库
【MySQL调优】如何进行MySQL调优?从参数、数据建模、索引、SQL语句等方向,三万字详细解读MySQL的性能优化方案(2024版)
MySQL调优主要分为三个步骤:监控报警、排查慢SQL、MySQL调优。 排查慢SQL:开启慢查询日志 、找出最慢的几条SQL、分析查询计划 。 MySQL调优: 基础优化:缓存优化、硬件优化、参数优化、定期清理垃圾、使用合适的存储引擎、读写分离、分库分表; 表设计优化:数据类型优化、冷热数据分表等。 索引优化:考虑索引失效的11个场景、遵循索引设计原则、连接查询优化、排序优化、深分页查询优化、覆盖索引、索引下推、用普通索引等。 SQL优化。
643 15
【MySQL调优】如何进行MySQL调优?从参数、数据建模、索引、SQL语句等方向,三万字详细解读MySQL的性能优化方案(2024版)
|
3月前
|
存储 SQL 关系型数据库
一篇文章搞懂MySQL的分库分表,从拆分场景、目标评估、拆分方案、不停机迁移、一致性补偿等方面详细阐述MySQL数据库的分库分表方案
MySQL如何进行分库分表、数据迁移?从相关概念、使用场景、拆分方式、分表字段选择、数据一致性校验等角度阐述MySQL数据库的分库分表方案。
527 15
一篇文章搞懂MySQL的分库分表,从拆分场景、目标评估、拆分方案、不停机迁移、一致性补偿等方面详细阐述MySQL数据库的分库分表方案

推荐镜像

更多