Flink CDC Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "CREATE_DTM", schema type: INT64?
可是查库没有create_dtmnull 值?
CREATE TABLE IF NOT EXISTS default_catalog
.B2C
.AIRLINE__T_REFUND_PAY_src
(
ID
BIGINT NOT NULL,
CREATE_DTM
TIMESTAMP NOT NULL,
PAY_NO
STRING NOT NULL,
CURRENCY
STRING NOT NULL,
REFUND_AMOUNT
DOUBLE NOT NULL,
BANK_CODE
STRING NULL,
STATUS
DECIMAL(22, 0) NOT NULL,
UPDATE_DTM
TIMESTAMP NULL,
REFUND_PAY_NO
STRING NOT NULL,
PAY_TP
DECIMAL(22, 0) NULL,
ORDER_NO
STRING NULL,
PAY_SUCCESS_NO
STRING NULL,
REFUND_SUCCESS_NO
STRING NULL,
REFUND_SUCCESS_DATE
TIMESTAMP NULL,
CREATE_ID
STRING NULL,
UPDATE_ID
STRING NULL,
REFUND_APPLY_DT
TIMESTAMP NULL,
DELETED
DECIMAL(22, 0) NOT NULL,
VERSION
BIGINT NULL,
SCCODE
STRING NOT NULL,
SUBSCCODE
STRING NOT NULL,
PCCODE
STRING NOT NULL,
SUBPCCODE
STRING NOT NULL,
NOTIFYURL
STRING NULL,
ORGSCDATE
STRING NOT NULL,
REMARK
STRING NULL,
RESV1
STRING NULL,
RESV2
STRING NULL,
RESV3
STRING NULL,
RESV4
STRING NULL,
REFUND_TYPE
DECIMAL(22, 0) NULL,
CHANNEL_CODE
STRING NULL,
SITE
STRING NULL,
PAY_ID
DECIMAL(22, 0) NULL,
REFUND_APPLY_ID
STRING NULL,
COUNTRY
STRING NULL,
LANGUAGE
STRING NULL,
SOURCE_TYPE
DECIMAL(22, 0) NULL,
ACCT_TIME
STRING NULL,
ASYNC_REFUND_TIME
STRING NULL,
PCSYSSEQID
STRING NULL,
CHARGE_OFFS
STRING NULL,
MEMO_INSIDE
STRING NULL,
PAY_AMOUNT_TYPE
STRING NULL,
APPLY_ID
BIGINT NULL,
REFUND_AMOUNT_HISTORY
BIGINT NULL,
PRIMARY KEY(ID
)
NOT ENFORCED
) with (
CREATE TABLE IF NOT EXISTS default_catalog
.B2C
.AIRLINE__T_REFUND_PAY_sink
(
ID
BIGINT NOT NULL,
CREATE_DTM
TIMESTAMP NOT NULL,
PAY_NO
STRING NOT NULL,
CURRENCY
STRING NOT NULL,
REFUND_AMOUNT
DOUBLE NOT NULL,
BANK_CODE
STRING NULL,
STATUS
DECIMAL(22, 0) NOT NULL,
UPDATE_DTM
TIMESTAMP NULL,
REFUND_PAY_NO
STRING NOT NULL,
PAY_TP
DECIMAL(22, 0) NULL,
ORDER_NO
STRING NULL,
PAY_SUCCESS_NO
STRING NULL,
REFUND_SUCCESS_NO
STRING NULL,
REFUND_SUCCESS_DATE
TIMESTAMP NULL,
CREATE_ID
STRING NULL,
UPDATE_ID
STRING NULL,
REFUND_APPLY_DT
TIMESTAMP NULL,
DELETED
DECIMAL(22, 0) NOT NULL,
VERSION
BIGINT NULL,
SCCODE
STRING NOT NULL,
SUBSCCODE
STRING NOT NULL,
PCCODE
STRING NOT NULL,
SUBPCCODE
STRING NOT NULL,
NOTIFYURL
STRING NULL,
ORGSCDATE
STRING NOT NULL,
REMARK
STRING NULL,
RESV1
STRING NULL,
RESV2
STRING NULL,
RESV3
STRING NULL,
RESV4
STRING NULL,
REFUND_TYPE
DECIMAL(22, 0) NULL,
CHANNEL_CODE
STRING NULL,
SITE
STRING NULL,
PAY_ID
DECIMAL(22, 0) NULL,
REFUND_APPLY_ID
STRING NULL,
COUNTRY
STRING NULL,
LANGUAGE
STRING NULL,
SOURCE_TYPE
DECIMAL(22, 0) NULL,
ACCT_TIME
STRING NULL,
ASYNC_REFUND_TIME
STRING NULL,
PCSYSSEQID
STRING NULL,
CHARGE_OFFS
STRING NULL,
MEMO_INSIDE
STRING NULL,
PAY_AMOUNT_TYPE
STRING NULL,
APPLY_ID
BIGINT NULL,
REFUND_AMOUNT_HISTORY
BIGINT NULL,
PRIMARY KEY(ID
, CREATE_DTM
)
NOT ENFORCED
) with (
INSERT INTO default_catalog
.B2C
.AIRLINE__T_REFUND_PAY_sink
SELECT * FROM default_catalog
.B2C
.AIRLINE__T_REFUND_PAY_src
WHERE CREATE_DTM
>= to_TimeStamp('2023-01-01 00:00:00');
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
看下flink-sql 的ddl,这张表的db对应的ddl看下,建立一张相同的表测一下,搞几个测试数据看看问题,此回答整理自钉群“Flink CDC 社区”
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。