开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC中同一个sql同步到mysql少数据,同步到 es 没有问题,什么原因?

"Flink CDC中 同一个sql 同步到mysql 少数据,同步到 es 没有问题,是不是同步到mysql的时候需要做什么特殊处理,同步的sql :SET 'pipeline.name' = 'batch_order_statistics_job';SET 'table.exec.resource.default-parallelism'= '2';SET 'sql-client.execution.result-mode' = 'tableau';-- 订单主表CREATE TABLE if not exists oms_order_source( id bigint COMMENT 'ID', sellerTenantId bigint COMMENT '租户Id', orderNo STRING COMMENT '订单号', orderType tinyint COMMENT '订单类型(1:增值商品/2:普通商品/3:物流)', orderStatus tinyint COMMENT '订单状态()', deptId bigint COMMENT '部门Id', customerId bigint COMMENT '客户id', payableAmount DECIMAL(10, 2) COMMENT '应付金额', creatorId bigint COMMENT '下单人', createTime TIMESTAMP(3) COMMENT '下单时间', createDate as TO_DATE(DATE_FORMAT(createTime,'yyyy-MM-dd')) comment '下单日期', PRIMARY KEY (id) NOT ENFORCED) comment '订单主表映射信息'with ( 'connector' = 'jdbc',-- 'scan.startup.mode'='latest-offset', -- 'server-id'='5400-5408', -- 'hostname' = '192.168.2.252', 'url' = 'jdbc:mysql://xxx:3306/cloud_account_system', 'username' = 'xxx', 'password' = 'xxx', 'sink.parallelism' ='4', 'table-name' = 'oms_order');-- 订单统计CREATE TABLE if not exists dws_order_sync_record_sink( id BIGINT COMMENT '主键', sellerTenantId BIGINT COMMENT '租户Id', customerId BIGINT COMMENT '客户Id', deptId BIGINT COMMENT '部门Id', creatorId BIGINT COMMENT '下单人', createDate DATE COMMENT '下单日期', orderNo STRING COMMENT '订单总量', payableAmount DECIMAL(10, 2) COMMENT '应收金额', PRIMARY KEY (sellerTenantId, deptId, creatorId, createDate) NOT ENFORCED)WITH ('connector' = 'jdbc', -- 'url' = 'jdbc:mysql://192.168.2.252:3306/cloud_account_system', -- 'username' = 'cloud_test', -- 'password' = 'Js81#5q2Pr', 'url' = 'jdbc:mysql://xxxx:3306/cloud_account_system', 'username' = 'xxx', 'password' = 'xxx', -- 'sink.buffer-flush.max-rows'='1000', 'sink.parallelism' ='4', 'table-name' = 'dws_order_sync_record');-- 订单数据统计-- insert into dws_order_sync_record_sinkselect a.id, a.sellerTenantId, a.customerId, a.deptId, a.creatorId, a.createDate, a.orderNo, a.payableAmountFROM oms_order_source awhere a.orderType = 2 and a.orderStatus <>4 and a.sellerTenantId<>-1 and a.deptId is not null;
104e64875a00ec441f336df91bccd538.png
不查询,只是打印,数据量是对的,只要插入到mysql 数据就少了,我看了下都是用的主键,都没一做聚合,不应该出现主键一样的情况。"

展开
收起
十一0204 2023-08-09 09:18:49 137 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    同一个 SQL 在 Flink CDC 中同步到 MySQL 会少数据,但同步到 Elasticsearch 没有问题,可能有几个原因导致这种情况:

    数据源和目标数据库配置有差异:请确保源数据和目标数据库的连接配置正确,并且表结构、字段映射和数据类型一致。如果配置有差异,可能导致数据在同步过程中丢失或转换错误。

    CDC 连接和数据捕获设置不正确:Flink CDC 需要正确配置 CDC 连接和数据捕获,以确保能够准确捕获变更数据。请确保 CDC 连接配置正确,并且捕获到的变更数据包含了您期望同步的数据。

    MySQL 配置问题:检查 MySQL 数据库的配置,例如事务隔离级别、日志配置等,确保其与 Flink CDC 的要求兼容。特别关注 MySQL 的事务提交和日志刷新机制,确保数据能够及时提交和持久化到磁盘。

    网络或资源问题:检查网络连接和资源利用情况,确保 Flink CDC 和 MySQL 之间的网络通畅,并且目标数据库的资源(例如 CPU、内存、磁盘)充足,不会导致数据同步延迟或丢失。

    CDC 同步逻辑问题:检查 Flink CDC 的同步逻辑,确保在数据写入 MySQL 之前没有过滤、转换或丢弃任何数据。

    2023-08-11 17:26:15
    赞同 展开评论 打赏
  • 意中人就是我呀!

    你的where过滤条件限制了。主键是否一致,where是否一致,这都是你需要确定的。看看sink端的数据库的union key 是否保持一致。你说的很对,但是你需要自己排查,没人能帮你,这么大数据量丢失,where条件,各个主键在mysql端,source端,sink端口,仔细对比flink的ddl上主键等。此回答整理至钉群“Flink CDC 社区”。

    2023-08-09 12:07:10
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    SQL Server在电子商务中的应用与实践 立即下载
    GeoMesa on Spark SQL 立即下载
    原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载

    相关镜像