报错如下:
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
Caused by: java.io.IOException: org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
Caused by: org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
Caused by: java.lang.NullPointerException
源表的表结构如下:
create table RUNCARD_T
(
  id                     NUMBER not null,
  sn                     VARCHAR2(60),
  parent_sn              VARCHAR2(60),
  wo_id                  NUMBER,
  route_id               NUMBER,
  current_site           NUMBER,
  wip_operation          NUMBER,
  last_operation         NUMBER,
  last_operation_counter NUMBER,
  status                 NUMBER,
  turnin_no              VARCHAR2(30),
  tracking_no            VARCHAR2(30),
  carton_no              VARCHAR2(60),
  pallet_no              VARCHAR2(60),
  gg_no                  VARCHAR2(35),
  gg_item                NUMBER,
  smt_turnin_no          VARCHAR2(30),
  rma_count              NUMBER,
  input_time             DATE,
  operation_time         DATE,
  turnin_time            DATE,
  ship_time              DATE,
  replace_flag           VARCHAR2(10),
  sample_batch           VARCHAR2(30),
  sample_flag            VARCHAR2(10),
  warranty               NUMBER,
  attribute1             VARCHAR2(30),
  ignore_defect_times    NUMBER,
  bag_no                 VARCHAR2(30),
  attribute4             VARCHAR2(30),
  attribute5             VARCHAR2(30)
)
目标pg表结构如下:
CREATE TABLE abc.runcard_t (
    id numeric NOT NULL,
    sn varchar(60) NULL,
    parent_sn varchar(60) NULL,
    wo_id numeric NULL,
    route_id numeric NULL,
    current_site numeric NULL,
    wip_operation numeric NULL,
    last_operation numeric NULL,
    last_operation_counter numeric NULL,
    status numeric NULL,
    turnin_no varchar(30) NULL,
    tracking_no varchar(30) NULL,
    carton_no varchar(60) NULL,
    pallet_no varchar(60) NULL,
    gg_no varchar(35) NULL,
    gg_item numeric NULL,
    smt_turnin_no varchar(30) NULL,
    rma_count numeric NULL,
    input_time timestamp NULL,
    operation_time timestamp NULL,
    turnin_time timestamp NULL,
    ship_time timestamp NULL,
    replace_flag varchar(10) NULL,
    sample_batch varchar(30) NULL,
    sample_flag varchar(10) NULL,
    warranty numeric NULL,
    attribute1 varchar(30) NULL,
    ignore_defect_times numeric NULL,
    bag_no varchar(30) NULL,
    attribute4 varchar(30) NULL,
    attribute5 varchar(30) NULL,
    CONSTRAINT runcard_t_pkey PRIMARY KEY (id)
);
flink sql 创建表如下:
CREATE TABLE oracle_runcard_t (
    id BIGINT,
    sn VARCHAR(60),
    parent_sn VARCHAR(60),
    wo_id BIGINT,
    route_id BIGINT,
    current_site BIGINT,
    wip_operation BIGINT,
    last_operation BIGINT,
    last_operation_counter BIGINT,
    status BIGINT,
    turnin_no VARCHAR(30),
    tracking_no VARCHAR(30),
    carton_no VARCHAR(60),
    pallet_no VARCHAR(60),
    gg_no VARCHAR(35),
    gg_item BIGINT,
    smt_turnin_no VARCHAR(30),
    rma_count BIGINT,
    input_time TIMESTAMP,
    operation_time TIMESTAMP,
    turnin_time TIMESTAMP,
    ship_time TIMESTAMP,
    replace_flag VARCHAR(10),
    sample_batch VARCHAR(30),
    sample_flag VARCHAR(10),
    warranty BIGINT,
    attribute1 VARCHAR(30),
    ignore_defect_times BIGINT,
    bag_no VARCHAR(30),
    attribute4 VARCHAR(30),
    attribute5 VARCHAR(30),
    PRIMARY KEY (id) NOT ENFORCED
) 
CREATE TABLE postgres_runcard_t (
    id BIGINT,
    sn VARCHAR(60),
    parent_sn VARCHAR(60),
    wo_id BIGINT,
    route_id BIGINT,
    current_site BIGINT,
    wip_operation BIGINT,
    last_operation BIGINT,
    last_operation_counter BIGINT,
    status BIGINT,
    turnin_no VARCHAR(30),
    tracking_no VARCHAR(30),
    carton_no VARCHAR(60),
    pallet_no VARCHAR(60),
    gg_no VARCHAR(35),
    gg_item BIGINT,
    smt_turnin_no VARCHAR(30),
    rma_count BIGINT,
    input_time TIMESTAMP,
    operation_time TIMESTAMP,
    turnin_time TIMESTAMP,
    ship_time TIMESTAMP,
    replace_flag VARCHAR(10),
    sample_batch VARCHAR(30),
    sample_flag VARCHAR(10),
    warranty BIGINT,
    attribute1 VARCHAR(30),
    ignore_defect_times BIGINT,
    bag_no VARCHAR(30),
    attribute4 VARCHAR(30),
    attribute5 VARCHAR(30),
    PRIMARY KEY (id) NOT ENFORCED
) 
怀疑是字段问题,但没有找到具体是哪个字段引起。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。