"Flink CDC中 同一个sql 同步到mysql 少数据,同步到 es 没有问题,是不是同步到mysql的时候需要做什么特殊处理,同步的sql :SET 'pipeline.name' = 'batch_order_statistics_job';SET 'table.exec.resource.default-parallelism'= '2';SET 'sql-client.execution.result-mode' = 'tableau';-- 订单主表CREATE TABLE if not exists oms_order_source
( id
bigint COMMENT 'ID', sellerTenantId
bigint COMMENT '租户Id', orderNo
STRING COMMENT '订单号', orderType
tinyint COMMENT '订单类型(1:增值商品/2:普通商品/3:物流)', orderStatus
tinyint COMMENT '订单状态()', deptId
bigint COMMENT '部门Id', customerId
bigint COMMENT '客户id', payableAmount
DECIMAL(10, 2) COMMENT '应付金额', creatorId
bigint COMMENT '下单人', createTime
TIMESTAMP(3) COMMENT '下单时间', createDate
as TO_DATE(DATE_FORMAT(createTime,'yyyy-MM-dd')) comment '下单日期', PRIMARY KEY (id
) NOT ENFORCED) comment '订单主表映射信息'with ( 'connector' = 'jdbc',-- 'scan.startup.mode'='latest-offset', -- 'server-id'='5400-5408', -- 'hostname' = '192.168.2.252', 'url' = 'jdbc:mysql://xxx:3306/cloud_account_system', 'username' = 'xxx', 'password' = 'xxx', 'sink.parallelism' ='4', 'table-name' = 'oms_order');-- 订单统计CREATE TABLE if not exists dws_order_sync_record_sink( id
BIGINT COMMENT '主键', sellerTenantId
BIGINT COMMENT '租户Id', customerId
BIGINT COMMENT '客户Id', deptId
BIGINT COMMENT '部门Id', creatorId
BIGINT COMMENT '下单人', createDate
DATE COMMENT '下单日期', orderNo
STRING COMMENT '订单总量', payableAmount
DECIMAL(10, 2) COMMENT '应收金额', PRIMARY KEY (sellerTenantId
, deptId
, creatorId
, createDate
) NOT ENFORCED)WITH ('connector' = 'jdbc', -- 'url' = 'jdbc:mysql://192.168.2.252:3306/cloud_account_system', -- 'username' = 'cloud_test', -- 'password' = 'Js81#5q2Pr', 'url' = 'jdbc:mysql://xxxx:3306/cloud_account_system', 'username' = 'xxx', 'password' = 'xxx', -- 'sink.buffer-flush.max-rows'='1000', 'sink.parallelism' ='4', 'table-name' = 'dws_order_sync_record');-- 订单数据统计-- insert into dws_order_sync_record_sinkselect a.id, a.sellerTenantId, a.customerId, a.deptId, a.creatorId, a.createDate, a.orderNo, a.payableAmountFROM oms_order_source awhere a.orderType = 2 and a.orderStatus <>4 and a.sellerTenantId<>-1 and a.deptId is not null;
不查询,只是打印,数据量是对的,只要插入到mysql 数据就少了,我看了下都是用的主键,都没一做聚合,不应该出现主键一样的情况。"
同一个 SQL 在 Flink CDC 中同步到 MySQL 会少数据,但同步到 Elasticsearch 没有问题,可能有几个原因导致这种情况:
数据源和目标数据库配置有差异:请确保源数据和目标数据库的连接配置正确,并且表结构、字段映射和数据类型一致。如果配置有差异,可能导致数据在同步过程中丢失或转换错误。
CDC 连接和数据捕获设置不正确:Flink CDC 需要正确配置 CDC 连接和数据捕获,以确保能够准确捕获变更数据。请确保 CDC 连接配置正确,并且捕获到的变更数据包含了您期望同步的数据。
MySQL 配置问题:检查 MySQL 数据库的配置,例如事务隔离级别、日志配置等,确保其与 Flink CDC 的要求兼容。特别关注 MySQL 的事务提交和日志刷新机制,确保数据能够及时提交和持久化到磁盘。
网络或资源问题:检查网络连接和资源利用情况,确保 Flink CDC 和 MySQL 之间的网络通畅,并且目标数据库的资源(例如 CPU、内存、磁盘)充足,不会导致数据同步延迟或丢失。
CDC 同步逻辑问题:检查 Flink CDC 的同步逻辑,确保在数据写入 MySQL 之前没有过滤、转换或丢弃任何数据。
你的where过滤条件限制了。主键是否一致,where是否一致,这都是你需要确定的。看看sink端的数据库的union key 是否保持一致。你说的很对,但是你需要自己排查,没人能帮你,这么大数据量丢失,where条件,各个主键在mysql端,source端,sink端口,仔细对比flink的ddl上主键等。此回答整理至钉群“Flink CDC 社区”。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。