Flink CDC中select 的时候,看了下,是正常的,就是插入得时候偶问题,什么原因?

"Flink CDC中select 的时候,看了下,是正常的,就是插入得时候偶问题,什么原因?
c0b939b0cbcf4271ee55c62dd9282f20.png
b84313638e8bf4bdb6dfade58c4c44c3.png
SET 'pipeline.name' = 'batch_order_statistics_job';
SET 'table.exec.resource.default-parallelism'= '2';
SET 'sql-client.execution.result-mode' = 'tableau';
-- 订单主表
CREATE TABLE if not exists oms_order_source
(
id bigint COMMENT 'ID',
sellerTenantId bigint COMMENT '租户Id',
orderNo STRING COMMENT '订单号',
orderType tinyint COMMENT '订单类型(1:增值商品/2:普通商品/3:物流)',
orderStatus tinyint COMMENT '订单状态()',
deptId bigint COMMENT '部门Id',
customerId bigint COMMENT '客户id',
payableAmount DECIMAL(10, 2) COMMENT '应付金额',
creatorId bigint COMMENT '下单人',
createTime TIMESTAMP(3) COMMENT '下单时间',
createDate as TO_DATE(DATE_FORMAT(createTime,'yyyy-MM-dd')) comment '下单日期',
PRIMARY KEY (id) NOT ENFORCED
) comment '订单主表映射信息'
with ( 'connector' = 'jdbc',
-- 'scan.startup.mode'='latest-offset',
-- 'server-id'='5400-5408',
-- 'hostname' = '192.168.2.252',
'url' = 'jdbc:mysql://xxx:3306/cloud_account_system',
'username' = 'xxx',
'password' = 'xxx',
'sink.parallelism' ='4',
'table-name' = 'oms_order');

-- 订单统计
CREATE TABLE if not exists dws_order_sync_record_sink
( id BIGINT COMMENT '主键',
sellerTenantId BIGINT COMMENT '租户Id',
customerId BIGINT COMMENT '客户Id',
deptId BIGINT COMMENT '部门Id',
creatorId BIGINT COMMENT '下单人',
createDate DATE COMMENT '下单日期',
orderNo STRING COMMENT '订单总量',
payableAmount DECIMAL(10, 2) COMMENT '应收金额',
PRIMARY KEY (sellerTenantId, deptId, creatorId, createDate) NOT ENFORCED
)
WITH ('connector' = 'jdbc',
-- 'url' = 'jdbc:mysql://192.168.2.252:3306/cloud_account_system',
-- 'username' = 'cloud_test',
-- 'password' = 'Js81#5q2Pr',
'url' = 'jdbc:mysql://xxxx:3306/cloud_account_system',
'username' = 'xxx',
'password' = 'xxx',
-- 'sink.buffer-flush.max-rows'='1000',
'sink.parallelism' ='4',
'table-name' = 'dws_order_sync_record');

-- 订单数据统计
-- insert into dws_order_sync_record_sink
select a.id,
a.sellerTenantId,
a.customerId,
a.deptId,
a.creatorId,
a.createDate,
a.orderNo,
a.payableAmount
FROM oms_order_source a
where a.orderType = 2
and a.orderStatus <>4
and a.sellerTenantId<>-1
and a.deptId is not null;
"

展开
收起
十一0204 2023-08-09 08:56:31 86 分享 版权
1 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,如果你在进行插入操作时遇到问题,可能有多个原因导致。以下是一些常见的原因和解决方法:

    数据库连接问题:确保你的 Flink CDC 应用程序能够正确连接到 Oracle 数据库。检查数据库的连接配置,包括主机名、端口号、用户名、密码等。还要确保数据库服务器正常运行,并且网络连接可靠。

    表结构不匹配:Flink CDC 需要源表和目标表之间的结构匹配,包括列的名称、类型和顺序。如果源表和目标表的结构不匹配,插入操作可能会失败。请确保源表和目标表的结构一致。

    主键冲突:如果目标表已经存在相同的主键值,并且你正在尝试插入重复的主键值,那么插入操作将会失败。请确保你的插入操作不会引起主键冲突,或者考虑使用更新操作来处理重复值。

    数据类型不匹配:检查源表和目标表中列的数据类型是否一致。如果数据类型不匹配,Flink CDC 可能无法正确地进行数据插入。

    数据转换问题:如果源表和目标表之间存在数据转换的需求,例如日期格式转换或字符串处理,确保转换逻辑正确并且与数据类型兼容。

    并发冲突:如果多个并发的插入操作试图同时修改相同的数据行,可能会引发并发冲突。这可能导致某些插入操作失败。考虑使用事务或调整并发控制策略来处理并发冲突。

    错误处理和日志记录:在你的 Flink CDC 应用程序中,确保对插入操作进行适当的错误处理和日志记录。这样可以帮助你定位和解决插入问题。

    2023-08-13 16:53:41
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理