Flink CDC 各位我flink 读取kafka数据然后入库到starrocks
CREATE TABLE mall_activity (
    id                     bigint,
    create_time            TIMESTAMP,
    create_by              bigint,
    create_by_name         string,
    update_time            TIMESTAMP,
    update_by              bigint,
    update_by_name         string,
    activity_name          string,
    appointment_start_time TIMESTAMP,
    end_time               TIMESTAMP,
    exchange_end_time      TIMESTAMP,
    exchange_start_time    TIMESTAMP,
    pick_end_time          TIMESTAMP,
    pick_start_time        TIMESTAMP,
    term                   string,
    year                   int,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://devdata2:9030',
    'load-url' = 'devdata2:8030',
    'database-name' = 'service_geek_coin',
    'table-name' = 'mall_activity',
    'username' = 'xxxx',
    'password' = 'xxxxx',
    'sink.semantic' = 'exactly-once',
    'sink.label-prefix' = 'mall_activity_20231118',
    'sink.properties.partial_update' = 'true'
);
不知道问题出在哪了?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
排查下看看日志的log有没有异常输出,如果没有看下自己的任务是不是全量同步(全量同步如果资源不足, 任务也会卡住,不报错,也不写数据到Starrocks),此回答整理自钉群“Flink CDC 社区”
看起来您是在创建一个外部表,用于将来自 Kafka 主题的数据插入到 StarRocks 中。但是,您似乎没有指定外部表的源位置,即 Kafka 主题的位置。
您需要将 Kafka 主题的地址作为外部表的一部分进行指定。以下是一个示例,展示了如何指定 Kafka 主题的位置:
WITH (
    'connector' = 'kafka',
    'topic' = 'your-topic-name',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset',
    'sink.semantic' = 'at-least-once',
    'sink.buffer-flush.max-rows' = '1'
)
您还可以根据自己的实际情况修改上述代码中的其他属性,例如 format、scan.startup.mode 和 sink.semantic 等。
根据您提供的Flink CDC配置,问题可能出在以下几个方面:
JDBC URL、Load URL、Database Name、Table Name、Username和Password的配置不正确。请确保这些参数的值与您的StarRocks实例相匹配。
Sink语义设置为'exactly-once',这意味着Flink CDC会尝试确保每个变更事件只被处理一次。如果这个设置导致问题,您可以尝试将其更改为'at-least-once'或'upto-date',以允许部分更新或增量更新。
'partial_update'属性设置为'true',这意味着Flink CDC将尝试执行部分更新操作。如果您的表有主键约束,并且某些记录已经存在于表中,那么这个设置可能会导致问题。您可以尝试将其设置为'false',以便禁用部分更新功能。
检查您的Kafka主题和Flink CDC源之间的数据格式是否匹配。确保Kafka消息包含正确的字段和数据类型,以便Flink CDC可以正确解析它们并将其插入到StarRocks表中。
检查您的StarRocks表结构是否与Flink CDC配置中的表结构相匹配。确保表的主键列和字段名称与配置中的值相匹配。
查看Flink CDC的日志以获取更多详细信息。这可以帮助您诊断问题并找到解决方案。
Flink CDC 创建 starrocks 表定义看起来有些混乱,检查以下几个方面:
PRIMARY KEY(id)。database-name 和 table-name 属性的格式正确,不应带引号。jdbc-url 属性指向正确的 StarRocks 服务器地址和端口。实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。