开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

是否可以直接进行mysql-flinkcdc-hive集成?

hi大神们,参阅资料知道flinkcdc可以代替采集端的kafka以减少组件维护成本。于是我利用flink sql client尝试进行mysql-flinkcdc-hive离线数仓链路的直接集成。我使用的jar包放在lib目录下,分别为:flink-sql-connector-mysql-cdc-2.2.1.jar,flink-connector-hive_2.12-1.14.3.jar,flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar,hive-exec-3.1.2.jar。 在sql client中使用的命令如下: ./bin/sql-client.sh embedded -j lib/flink-sql-connector-mysql-cdc-2.2.1.jar lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar

SET sql-client.execution.result-mode = tableau; SET 'execution.target' = 'yarn-per-job'; SET execution.checkpointing.interval = 5s; SET execution.checkpointing.tolerable-failed-checkpoints=3;

CREATE TABLE mysql_test_a ( id int, data string, create_time timestamp, PRIMARY KEY (id) NOT ENFORCED ) with ( 'connector'='mysql-cdc', 'hostname'='my_mysql_ip', 'port'='3306', 'username'='flink', 'password' = 'my_password', 'database-name'='flink_cdc', 'table-name'='test_a' );

此时连接mysql可以通。接着创建hive catalog。

SET table.sql-dialect=hive; CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'hive-conf-dir' = '/usr/local/hive/conf' );

show catalogs; use catalog hive_catalog; show databases; use test; show tables;

create table hive_test_a( id bigint, data String, create_time Timestamp ) STORED AS parquet TBLPROPERTIES ( 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='0S', 'sink.partition-commit.policy.kind'='metastore,success-file', 'auto-compaction'='true', 'compaction.file-size'='128MB' );

SET table.sql-dialect=default; SET execution.type = batch;

insert into hive_catalog.test.hive_test_a select * from default_catalog.default_database.mysql_test_a;

此时失败:报错提示:

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Table sink 'hive_catalog.test.hive_test_a' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mysql_test_a]], fields=[id, data, create_time])

我在网上查询不到mysql-flinkcdc-hive的数据同步案例。有人说mysql-flinkcdc-hive必须经过两步:mysql-flinkcdc-kafka,kafka-flinkcdc-hive。(因为数据格式转换的问题)这样的两步法我确实能走通。但是不懂上述的一步法为什么不行,是天然不支持这么做,还是我哪里做得不对?

此外,我也注意到了之前类似的问题。

https://developer.aliyun.com/ask/442766?spm=a2c6h.14164896.0.0.46ca7c2cAa4OjY

https://developer.aliyun.com/ask/367408?spm=a2c6h.14164896.0.0.4d917c2cOsEKvd

我认为这个问题需要被解决,希望大神们知无不言言无不尽。

展开
收起
5rda4r2daezlc 2023-04-18 15:24:26 320 0
1 条回答
写回答
取消 提交回答
  • 随心分享,欢迎友善交流讨论:)

    根据报错信息,可以看出问题在于 Table sink 'hive_catalog.test.hive_test_a' doesn't support consuming update and delete changes,即 Hive 的表不支持接收更新和删除操作。而 Flink CDC 产生的是流式的增删改变化数据,因此需要对其进行处理后再写入 Hive 中。

    你所说的两步法,将 Flink CDC 产生的增删改数据先写入 Kafka 中,再从 Kafka 中读取数据,进行格式转换后写入 Hive 中,是一种常见的解决方案。通过 Kafka,可以将 Flink CDC 产生的数据格式转换为 Hive 支持的格式,同时还可以实现数据缓存、数据去重等功能。因此,这种方式是比较可靠的,也是比较常见的解决方案。

    如果你想尝试直接将 Flink CDC 产生的数据写入 Hive,可以考虑使用 Flink 的 Table API 或者 DataStream API 进行开发。在代码中,你需要对 Flink CDC 产生的数据进行格式转换,转换为 Hive 支持的格式,然后再将数据写入 Hive。

    2023-04-18 18:00:04
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像