Flink CDC消费kafka的数据写入doria中报这种错,大家有遇到过么?
我看我的配置文件中pom.xml文件里是有这个依赖的
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您描述的问题,Flink CDC消费Kafka的数据写入Doris时出现错误,而您提到pom.xml
文件中已经包含了相关依赖。以下是针对该问题的详细分析和解决方案:
根据知识库中的信息,以下是一些可能导致该问题的原因: - 依赖冲突或版本不匹配:虽然pom.xml
中包含了相关依赖,但可能存在依赖冲突或版本不兼容的情况。 - Kafka与Flink网络连通性问题:即使网络连通,Kafka Broker返回的元信息(如Endpoint)可能无法被Flink正确解析。 - 数据格式或Schema不匹配:如果Kafka中的数据格式(如JSON、Canal、Debezium)与Flink表定义的Schema不一致,可能会导致解析失败。 - CDC事件重复投递:在非正常情况下,CDC工具(如Canal或Debezium)可能会投递重复的变更事件,导致Flink处理异常。
为了准确定位问题,请按照以下步骤进行排查:
pom.xml
中引入的依赖版本与Flink和Kafka的版本兼容。例如:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
mvn dependency:tree
命令检查是否存在依赖冲突。zkCli.sh
ls /brokers/ids
get /brokers/ids/{your_broker_id}
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'canal-json'
);
SET table.exec.source.cdc-events-duplicate=true;
根据排查结果,采取以下措施解决问题:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
advertised.listeners
配置正确,并将转发地址添加到Kafka Broker的Listener中。CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'debezium-json'
);
CREATE TABLE KafkaTable (
user_id BIGINT PRIMARY KEY NOT ENFORCED,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'canal-json'
);
WITH
参数中正确设置。通过以上步骤,您可以逐步定位并解决Flink CDC消费Kafka数据写入Doris时的错误。如果问题仍未解决,请提供具体的错误日志以便进一步分析。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。