Mysql5.7使用DTS增量同步数据到MaxCompute

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 本文介绍了使用适应增量同步Mysql5.7数据同步到MaxCompute同步方式,主要过程为设置mysql5.7的binlog,以及使用DTS同步Mysql同步所有数据之前的全量数据操作,以及增量数据同步的配置,以及最后整合最后数据之前的全部数据。

背景:一些客户反馈,增量同步数据到MaxCompute按照全天的数据做增量数据同步数据量太大,且不使用按天的增量同步数据,进行在MaxCompute上进行数据处理得出增量数据对于delete的相关数据不能做到很好的支持,在次给大家一个对增量数据同步的方案使用DTS做增量同步数据到MaxCompute,数据源为ECS上自建的mysql5.7。

一、为自建MySQL创建账号并设置

1.1登陆自建Mysql数据库

image.png

1.2创建mysql数据库中用于数据迁移/同步的账号

CREATE USER 'dtsmigration'@'%' IDENTIFIED BY 'Dts123456';

说明:

  • username:待创建的账号。
  • host:允许该账号登录的主机,如果允许该账号从任意主机登录数据库,可以使用百分号(%)。
  • password:账号的密码。

1.3对账号进行授权操作

GRANT privileges ON databasename.tablename TO 'username'@'host' WITH GRANT OPTION;




说明:

  • privileges:授予该账号的操作权限,如SELECT、INSERT、UPDATE等,如果要授予该账号所有权限,则使用ALL。
  • databasename:数据库名。如果要授予该账号具备所有数据库的操作权限,则使用星号(*)。
  • tablename:表名。如果要授予该账号具备所有表的操作权限,则使用星号(*)。
  • username:待授权的账号。
  • host:允许该账号登录的主机,如果允许该账号从任意主机登录,则使用百分号(%)。
  • WITH GRANT OPTION:授予该账号使用GRANT命令的权限,该参数为可选。


如果要给账户赋予所有数据库和表的权限,并容许从任意主机登陆数据库

GRANT ALL ON *.* TO 'dtsmigration'@'%';

1.4开启并设置自建Mysql数据库binlog

到指定目录下找到该文件
image.png

a.使用vim命令,修改配置文件my.cnf中的如下参数

log_bin=mysql_bin
binlog_format=row
server_id=2 //设置大于1的整数
binlog_row_image=full //当自建MySQL的版本大于5.6时,则必须设置该项。


image.png

b.修改完成后,重启Mysql进程。

image.png

service mysqld restart

image.png

二、同步过程介绍

2.1结构初始化

DTS将源库中待同步表的结构定义信息同步至MaxCompute中,初始化时DTS会为表名增加_base后缀。例如源表为customer,那么MaxCompute中的表即为customer_base。

2.2全量数据初始化

DTS将源库中待同步表的存量数据,全部同步至MaxCompute中的目标表名_base表中(例如从源库的customer表同步至MaxCompute的customer_base表),作为后续增量同步数据的基线数据。

2.3增量数据同步

DTS在MaxCompute中创建一个增量日志表,表名为同步的目标表名_log,例如customer_log,然后将源库产生的增量数据实时同步到该表中。

三、增量同步实践

3.1购买DTS同步


image.png

3.2查看购买的DTS同步,点击配置同步链路

image.png

3.3配置对应的数据源和相应的MaxCompute项目


image.png

3.4点击授予权限的同步账号操作

image.png

3.5选择对应的增量同步数据的同步实践,并选择需要同步的表

image.png


3.6同步配置预检查

image.png

3.7查询同步的全量数据

image.png

3.8查看同步成功的增量数据分区user_log

image.png

3.9查看增量数据同步的数据

image.png


元数据的字段介绍

字段 说明
record_id 增量日志的记录id,为该日志唯一标识。
说明
- id的值唯一且递增。
- 如果增量日志的操作类型为UPDATE,那么增量更新会被拆分成两条记录,且record_id的值相同。
operation_flag 操作类型,取值:
- I:INSERT操作。
- D:DELETE操作。
- U:UPDATE操作。
utc_timestamp 操作时间戳,即binlog的时间戳(UTC 时间)。
before_flag 所有列的值是否为更新前的值,取值:Y或N。
after_flag 所有列的值是否为更新后的值,取值:Y或N。

四、根据时间点位,整合该时间点位之前的全量数据

4.1建立全量数据表

CREATE TABLE IF NOT EXISTS maxcomputeone_dev.user_all(uid BIGINT,uname STRING,deptno BIGINT,gender STRING,optime DATETIME,record_id BIGINT,operation_flag STRING,utc_timestamp BIGINT,before_flag STRING,after_flag STRING);

4.2查看增量数据最后同步的点位,最后整合全量数据到user_all

image.png


合并语句

set odps.sql.allow.fullscan=true;
insert overwrite table user_all
select uid,
       uname,
       deptno,
       gender,
       optime
  from(
select row_number() over(partition by t.uid
 order by record_id desc, after_flag desc) as record_num, record_id, operation_flag, after_flag, uid, uname, deptno,gender,optime
  from(
select incr.record_id, incr.operation_flag, incr.after_flag, incr.uid, incr.uname,incr.deptno,incr.gender,incr.optime
  from user_log incr
 where utc_timestamp <= 1585107804
 union all
select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.uid, base.uname,base.deptno,base.gender,base.optime
  from user_base base) t) gt
where record_num=1 
  and after_flag='Y';

欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
image

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
4月前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
661 0
|
6天前
|
SQL 监控 关系型数据库
MySQL原理简介—12.MySQL主从同步
本文介绍了四种为MySQL搭建主从复制架构的方法:异步复制、半同步复制、GTID复制和并行复制。异步复制通过配置主库和从库实现简单的主从架构,但存在数据丢失风险;半同步复制确保日志复制到从库后再提交事务,提高了数据安全性;GTID复制简化了配置过程,增强了复制的可靠性和管理性;并行复制通过多线程技术降低主从同步延迟,保证数据一致性。此外,还讨论了如何使用工具监控主从延迟及应对策略,如强制读主库以确保即时读取最新数据。
MySQL原理简介—12.MySQL主从同步
|
2月前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
186 17
|
2月前
|
存储 数据采集 监控
阿里云DTS踩坑经验分享系列|SLS同步至ClickHouse集群
作为强大的日志服务引擎,SLS 积累了用户海量的数据。为了实现数据的自由流通,DTS 开发了以 SLS 为源的数据同步插件。目前,该插件已经支持将数据从 SLS 同步到 ClickHouse。通过这条高效的同步链路,客户不仅能够利用 SLS 卓越的数据采集和处理能力,还能够充分发挥 ClickHouse 在数据分析和查询性能方面的优势,帮助企业显著提高数据查询速度,同时有效降低存储成本,从而在数据驱动决策和资源优化配置上取得更大成效。
180 9
|
5月前
|
消息中间件 canal 关系型数据库
Maxwell:binlog 解析器,轻松同步 MySQL 数据
Maxwell:binlog 解析器,轻松同步 MySQL 数据
491 11
|
4月前
|
SQL 存储 关系型数据库
Mysql主从同步 清理二进制日志的技巧
Mysql主从同步 清理二进制日志的技巧
49 1
|
5月前
|
NoSQL 安全 容灾
阿里云DTS踩坑经验分享系列|Redis迁移、同步
阿里云数据传输服务DTS在帮助用户迁移Redis数据、同步数据时,在某些复杂场景下会出现报错,或者源库与目标库数据不一致的问题,给用户带来困扰。本文介绍了DTS Redis到Redis迁移、同步过程中的典型问题,以帮助用户更好地使用DTS。
348 2
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 存储 关系型数据库
实时计算 Flink版产品使用问题之同步MySQL多张表的过程中,内存释放依赖于什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
24天前
|
关系型数据库 MySQL 数据库连接
数据库连接工具连接mysql提示:“Host ‘172.23.0.1‘ is not allowed to connect to this MySQL server“
docker-compose部署mysql8服务后,连接时提示不允许连接问题解决

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 推荐镜像

    更多