开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

"Flink CDC中 为什么这么写编译不过?

"Flink CDC中 为什么这么写编译不过?
050b22ab838f1a8814a780d8910da719.png
CREATE TABLE RMS_STORAGE_OUT_CARDS(
C_ID INT,
C_STORAGE_OUT_ITEM_ID INT,
C_CARD_NUMBER STRING,
C_GOODS_ID INT,
C_STATUS INT,
C_CREATEDBY STRING,
C_CREATEDON TIMESTAMP,
C_CHANNEL_CODE STRING,
C_HOSPID STRING,
proc_time AS PROCTIME(),
PRIMARY KEY (C_ID) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'oracle12cdb.xxxx.com',
'port' = '1521',
'username' = 'misnew',
'password' = 'mis32006',
'database-name' = 'XXXXRAC',
'schema-name' = 'RMS',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true',
'debezium.include.schema.changes' = 'false',
'scan.incremental.snapshot.chunk.key-column' = 'C_ID',
'scan.incremental.snapshot.chunk.size' = '80960' ,
'scan.snapshot.fetch.size' = '1024',
'table-name' = 'RMS_STORAGE_OUT_CARDS'
);

CREATE TABLE RMS_GOODS_BATCH(
C_ID INT,
C_NAME STRING,
C_OUTER_ID INT,
C_TYPE INT,
C_CARDTYPE INT,
C_START_DATE TIMESTAMP,
C_END_DATE TIMESTAMP,
C_PRICE DECIMAL(10,2),
C_GOODS_ID INT,
C_ENABLE INT,
C_FLAG INT,
C_SMS_TPL STRING,
C_CREATEDBY STRING,
C_CREATEDON TIMESTAMP,
C_OP_USER STRING,
C_OP_TIME TIMESTAMP,
PRIMARY KEY (C_ID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@oracle12cdb.xxxx.com:1521:xxxxrac',
'driver' = 'oracle.jdbc.driver.OracleDriver',
'username' = 'misnew',
'password' = 'mis32006',
'table-name' = 'RMS.RMS_GOODS_BATCH'
);

CREATE TABLE RMS_STORAGE_OUT_ITEM(
C_ID INT,
C_STORAGE_OUT_ID INT,
C_SALE_ITEM_ID INT,
C_BATCH_ID STRING,
C_GOODS_ID INT,
C_GOODS_CODE STRING,
C_SALE_PRICE DECIMAL(10,2),
C_CARD_TYPE STRING,
C_GOODS_NAME STRING,
C_AMOUNT INT,
C_REMARK STRING,
C_PRICE DECIMAL(10,2),
C_CHARGE_TYPE INT,
C_CHARGE_ID INT,
PRIMARY KEY (C_ID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@oracle12cdb.xxxx.com:1521:xxxxrac',
'driver' = 'oracle.jdbc.driver.OracleDriver',
'username' = 'misnew',
'password' = 'mis32006',
'table-name' = 'RMS.RMS_STORAGE_OUT_ITEM'
);

CREATE TABLE sink_rms_storage_out_cards (
C_ID INT,
C_CARD_NUMBER STRING,
C_STATUS INT,
C_CREATEDBY STRING,
C_CREATEDON TIMESTAMP,
C_CHANNEL_CODE STRING,
C_HOSPID STRING,
CARD_TYPE INT,
SALE_PRICE DECIMAL(10,2),
C_CHARGE_TYPE INT,
PRIMARY KEY (C_ID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.108.37.132:3306/test',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '123456',
'table-name' = 'rms_storage_out_cards2');

insert into sink_rms_storage_out_cards
select
o.C_ID ,
o.C_CARD_NUMBER ,
o.C_STATUS ,
o.C_CREATEDBY ,
o.C_CREATEDO
"

展开
收起
十一0204 2023-07-26 08:16:48 68 0
3 条回答
写回答
取消 提交回答
  • 很抱歉,你提到了 "Flink CDC中 为什么这么写编译不过",但未提供具体的代码或错误信息。我无法确定你指的是哪一部分代码和编译错误。

    如果你遇到编译错误,请确保以下几点:

    1. 语法错误:检查代码中是否存在拼写错误、缺少分号(;)或不匹配的括号等常见的语法问题。

    2. 导入包:确认你是否正确导入所需的依赖包。在编写 Flink 程序时,需要引入 Flink 的相关类和接口,以及其他依赖库。

    3. 版本兼容性:确保所使用的 Flink 版本与你的代码和依赖项版本相匹配,并且没有冲突。不同版本之间可能存在 API 或功能上的差异。

    4. 上下文环境:检查你的代码是否在正确的上下文环境中执行。比如,在 Flink 中,DAG(有向无环图)的构建通常在 ExecutionEnvironment 或 StreamExecutionEnvironment 中进行。

    如果以上检查仍然无法解决问题,请提供更多具体的错误信息、代码片段或相关背景信息,以便我能够更好地帮助你找到问题所在并提供解决方案。

    2023-07-31 22:49:03
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    由以下原因造成的:
    缺少必要的依赖项:Flink CDC需要一些第三方库来运行,例如Debezium、Kafka等。如果您没有正确地将这些库添加到您的项目中,可能会出现编译错误。请确保您已经正确地添加了所有必要的依赖项,可以通过Maven或Gradle等构建工具来管理依赖项。
    错误的代码语法:如果您的代码语法存在错误,编译器将无法正确解析代码并报告错误。请检查您的代码,确保它符合Java或Scala的语法规则。您可以尝试使用IDE或文本编辑器等工具来检查代码语法错误。
    不兼容的版本:Flink CDC需要与Flink本身和其他依赖项的版本兼容。如果您使用的是不兼容的版本,可能会导致编译错误。请确保您使用的所有库的版本都与Flink CDC兼容,并且没有发生版本冲突。

    2023-07-29 15:51:46
    赞同 展开评论 打赏
  • 意中人就是我呀!

    " 你sql写的不对,当然有问题。
    insert into sink_rms_storage_out_cards
    select
    o.C_ID ,
    o.C_CARD_NUMBER ,
    o.C_STATUS ,
    o.C_CREATEDBY ,
    o.C_CREATEDON ,
    o.C_CHANNEL_CODE ,
    o.C_HOSPID ,
    b.C_TYPE ,
    i.C_SALE_PRICE ,
    i.C_CHARGE_TYPE
    from
    (
    select C_ID
    ,C_CARD_NUMBER
    ,C_STATUS
    ,C_CREATEDBY
    ,C_CREATEDON
    ,C_CHANNEL_CODE
    ,C_HOSPID
    ,C_STORAGE_OUT_ITEM_ID
    ,cast(C_STORAGE_OUT_ITEM_ID as string) as C_STORAGE_OUT_ITEM_ID2
    from RMS_STORAGE_OUT_CARDS
    ) AS o
    LEFT JOIN
    BATCH_ID FROM RMS_STORAGE_OUT_ITEM FOR SYSTEM_TIME AS OF o.proc_time AS i
    ON o.C_STORAGE_OUT_ITEM_ID2 = i.C_ID
    LEFT JOIN
    RMS_GOODS_BATCH FOR SYSTEM_TIME AS OF o.proc_time AS b
    ON i.BATCH_ID = b.C_ID
    ;
    o表忘了加proc_time,自己手机加下
    此回答整理至钉群“Flink CDC 社区”。"

    2023-07-26 12:15:40
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载