Flink CDC里有没有大佬试过cdc3.0.0到doris管道模式全库同步。我这边按照git配置后,启用同步脚本报错org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type java.lang.String
from Object value (token JsonToken.START_OBJECT
)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您提供的错误信息和问题描述,以下是针对Flink CDC 3.0.0到Doris管道模式全库同步报错的详细分析与解决方案:
错误信息如下:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type java.lang.String from Object value (token JsonToken.START_OBJECT)
该错误表明在反序列化过程中,Jackson库尝试将一个JSON对象(JsonToken.START_OBJECT
)解析为字符串类型,但未能成功。这通常是由于以下原因之一导致的: 1. 配置文件格式不正确:某些字段的值被错误地定义为对象而非字符串。 2. 依赖版本冲突:Flink CDC 3.0.0与Doris连接器或其他依赖库之间可能存在版本不兼容问题。 3. 参数传递错误:在配置CDC Source或Sink时,某些参数的格式不符合预期。
确保您的配置文件(如flink-conf.yaml
或SQL脚本中的WITH参数)中所有字段的值都符合预期格式。例如: - 如果某个字段需要字符串类型的值,请确保其值用双引号包裹,例如:'key' = "value"
。 - 如果字段需要JSON对象,请确保其格式正确,例如:'key' = {"subKey": "subValue"}
。
参考示例:
CREATE TABLE doris_sink (
id INT,
name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'http://<doris-fe-host>:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = ''
);
如果配置文件中有类似以下内容,请检查是否符合上述要求:
cdc.source.config: {
"hostname": "localhost",
"port": 3306,
"username": "root",
"password": "password"
}
Flink CDC 3.0.0与Doris连接器可能存在版本冲突。建议按照以下步骤排查: 1. 确认Doris连接器版本:确保使用的Doris连接器版本与Flink CDC 3.0.0兼容。可以参考Doris官方文档或社区支持的版本列表。 2. 排除冲突依赖:在pom.xml
中明确指定依赖版本,并使用<scope>provided</scope>
排除不必要的依赖。例如:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-doris</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
Jackson库的版本不一致可能导致反序列化失败。建议采取以下措施: 1. 统一Jackson版本:在pom.xml
中显式指定Jackson库的版本,确保与Flink CDC 3.0.0兼容。例如:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.0</version>
</dependency>
classloader.parent-first-patterns.additional: com.fasterxml.jackson
在使用Flink CDC 3.0.0时,确保CDC Source的配置正确无误。以下是一个典型的MySQL CDC Source配置示例:
CREATE TABLE mysql_source (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'test_db',
'table-name' = 'test_table'
);
请特别注意以下几点: - 主键配置:确保表中定义了主键,否则可能导致反序列化失败。 - 表名格式:table-name
需要包含数据库名,例如:test_db.test_table
。
为了进一步定位问题,可以启用Flink的调试日志,查看详细的错误堆栈信息。在flink-conf.yaml
中添加以下配置:
logger.level: DEBUG
通过以上步骤,您可以逐步排查并解决MismatchedInputException
问题。如果问题仍未解决,请提供以下信息以便进一步分析: 1. 完整的SQL脚本或配置文件内容。 2. 使用的Flink、Flink CDC和Doris连接器的具体版本。 3. 错误日志的完整堆栈信息。
希望以上内容能够帮助您解决问题!您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。