flink 1.18.1 同步oracle 19c 到pg 15,job执行以后一直无法实现资料同步

报错如下:
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
Caused by: java.io.IOException: org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
Caused by: org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
Caused by: java.lang.NullPointerException

源表的表结构如下:
create table RUNCARD_T
(
id NUMBER not null,
sn VARCHAR2(60),
parent_sn VARCHAR2(60),
wo_id NUMBER,
route_id NUMBER,
current_site NUMBER,
wip_operation NUMBER,
last_operation NUMBER,
last_operation_counter NUMBER,
status NUMBER,
turnin_no VARCHAR2(30),
tracking_no VARCHAR2(30),
carton_no VARCHAR2(60),
pallet_no VARCHAR2(60),
gg_no VARCHAR2(35),
gg_item NUMBER,
smt_turnin_no VARCHAR2(30),
rma_count NUMBER,
input_time DATE,
operation_time DATE,
turnin_time DATE,
ship_time DATE,
replace_flag VARCHAR2(10),
sample_batch VARCHAR2(30),
sample_flag VARCHAR2(10),
warranty NUMBER,
attribute1 VARCHAR2(30),
ignore_defect_times NUMBER,
bag_no VARCHAR2(30),
attribute4 VARCHAR2(30),
attribute5 VARCHAR2(30)
)

目标pg表结构如下:
CREATE TABLE abc.runcard_t (
id numeric NOT NULL,
sn varchar(60) NULL,
parent_sn varchar(60) NULL,
wo_id numeric NULL,
route_id numeric NULL,
current_site numeric NULL,
wip_operation numeric NULL,
last_operation numeric NULL,
last_operation_counter numeric NULL,
status numeric NULL,
turnin_no varchar(30) NULL,
tracking_no varchar(30) NULL,
carton_no varchar(60) NULL,
pallet_no varchar(60) NULL,
gg_no varchar(35) NULL,
gg_item numeric NULL,
smt_turnin_no varchar(30) NULL,
rma_count numeric NULL,
input_time timestamp NULL,
operation_time timestamp NULL,
turnin_time timestamp NULL,
ship_time timestamp NULL,
replace_flag varchar(10) NULL,
sample_batch varchar(30) NULL,
sample_flag varchar(10) NULL,
warranty numeric NULL,
attribute1 varchar(30) NULL,
ignore_defect_times numeric NULL,
bag_no varchar(30) NULL,
attribute4 varchar(30) NULL,
attribute5 varchar(30) NULL,
CONSTRAINT runcard_t_pkey PRIMARY KEY (id)
);

flink sql 创建表如下:

CREATE TABLE oracle_runcard_t (
id BIGINT,
sn VARCHAR(60),
parent_sn VARCHAR(60),
wo_id BIGINT,
route_id BIGINT,
current_site BIGINT,
wip_operation BIGINT,
last_operation BIGINT,
last_operation_counter BIGINT,
status BIGINT,
turnin_no VARCHAR(30),
tracking_no VARCHAR(30),
carton_no VARCHAR(60),
pallet_no VARCHAR(60),
gg_no VARCHAR(35),
gg_item BIGINT,
smt_turnin_no VARCHAR(30),
rma_count BIGINT,
input_time TIMESTAMP,
operation_time TIMESTAMP,
turnin_time TIMESTAMP,
ship_time TIMESTAMP,
replace_flag VARCHAR(10),
sample_batch VARCHAR(30),
sample_flag VARCHAR(10),
warranty BIGINT,
attribute1 VARCHAR(30),
ignore_defect_times BIGINT,
bag_no VARCHAR(30),
attribute4 VARCHAR(30),
attribute5 VARCHAR(30),
PRIMARY KEY (id) NOT ENFORCED
)

CREATE TABLE postgres_runcard_t (
id BIGINT,
sn VARCHAR(60),
parent_sn VARCHAR(60),
wo_id BIGINT,
route_id BIGINT,
current_site BIGINT,
wip_operation BIGINT,
last_operation BIGINT,
last_operation_counter BIGINT,
status BIGINT,
turnin_no VARCHAR(30),
tracking_no VARCHAR(30),
carton_no VARCHAR(60),
pallet_no VARCHAR(60),
gg_no VARCHAR(35),
gg_item BIGINT,
smt_turnin_no VARCHAR(30),
rma_count BIGINT,
input_time TIMESTAMP,
operation_time TIMESTAMP,
turnin_time TIMESTAMP,
ship_time TIMESTAMP,
replace_flag VARCHAR(10),
sample_batch VARCHAR(30),
sample_flag VARCHAR(10),
warranty BIGINT,
attribute1 VARCHAR(30),
ignore_defect_times BIGINT,
bag_no VARCHAR(30),
attribute4 VARCHAR(30),
attribute5 VARCHAR(30),
PRIMARY KEY (id) NOT ENFORCED
)

怀疑是字段问题,但没有找到具体是哪个字段引起。

展开
收起
游客plhezj4vh3jvg 2025-03-18 16:29:29 13 发布于江苏 分享
分享
版权
举报
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理