开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flinksql同步mysql数据到doris提示incompatible types

flink版本为:1.19
cdc任务报错,请问是否是mysql8.x的配置有什么问题?具体报错如下:
Caused by: java.io.InvalidClassException: org.apache.flink.cdc.connectors.shaded.com.fasterxml.jackson.databind.cfg.MapperConfig; incompatible types for field _mapperFeatures

flinksql如下:
SET 'execution.checkpointing.interval' = '40s';
SET 'execution.checkpointing.max-concurrent-checkpoints'='1';
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';

CREATE TABLE IF NOT EXISTS mysql_tag_categories_source(
id varchar(80),
parent_id varchar(80),
name varchar(512),
level INT,
ranking BIGINT,
path varchar(512),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'server-id' = '60200-60300',
'database-name' = 'sc_stagm',
'table-name' = 'tag_categories',
'hostname' = '10.8.1.20',
'port' = '3317',
'username' = 's1',
'password' = '$0&M@2o22',
'server-time-zone' = 'Asia/Shanghai'
);

CREATE TABLE IF NOT EXISTS doris_tag_categories_sink (
id string,
parent_id string,
name string,
level INT,
ranking BIGINT,
path string,
PRIMARY KEY(id) NOT ENFORCED
)
WITH (
'connector' = 'doris',
'table.identifier' = 'sc_scndp.dim_tag_categories',
'sink.label-prefix' = 'doris_dim_tag_categories_1',
'fenodes' = '10.8.1.24:8030',
'username' = 'root',
'password' = '123456',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-2pc'='true'
);
insert into doris_tag_categories_sink select * FROM mysql_tag_categories_source;

展开
收起
hlg4kunfbuxuq 2024-11-19 21:55:17 16 0
1 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载