hi大神们,参阅资料知道flinkcdc可以代替采集端的kafka以减少组件维护成本。于是我利用flink sql client尝试进行mysql-flinkcdc-hive离线数仓链路的直接集成。我使用的jar包放在lib目录下,分别为:flink-sql-connector-mysql-cdc-2.2.1.jar,flink-connector-hive_2.12-1.14.3.jar,flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar,hive-exec-3.1.2.jar。 在sql client中使用的命令如下: ./bin/sql-client.sh embedded -j lib/flink-sql-connector-mysql-cdc-2.2.1.jar lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar
SET sql-client.execution.result-mode = tableau; SET 'execution.target' = 'yarn-per-job'; SET execution.checkpointing.interval = 5s; SET execution.checkpointing.tolerable-failed-checkpoints=3;
CREATE TABLE mysql_test_a ( id
int, data
string, create_time
timestamp, PRIMARY KEY (id
) NOT ENFORCED ) with ( 'connector'='mysql-cdc', 'hostname'='my_mysql_ip', 'port'='3306', 'username'='flink', 'password' = 'my_password', 'database-name'='flink_cdc', 'table-name'='test_a' );
此时连接mysql可以通。接着创建hive catalog。
SET table.sql-dialect=hive; CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'hive-conf-dir' = '/usr/local/hive/conf' );
show catalogs; use catalog hive_catalog; show databases; use test; show tables;
create table hive_test_a( id bigint, data String, create_time Timestamp ) STORED AS parquet TBLPROPERTIES ( 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='0S', 'sink.partition-commit.policy.kind'='metastore,success-file', 'auto-compaction'='true', 'compaction.file-size'='128MB' );
SET table.sql-dialect=default; SET execution.type = batch;
insert into hive_catalog.test.hive_test_a select * from default_catalog.default_database.mysql_test_a;
此时失败:报错提示:
[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Table sink 'hive_catalog.test.hive_test_a' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mysql_test_a]], fields=[id, data, create_time])
我在网上查询不到mysql-flinkcdc-hive的数据同步案例。有人说mysql-flinkcdc-hive必须经过两步:mysql-flinkcdc-kafka,kafka-flinkcdc-hive。(因为数据格式转换的问题)这样的两步法我确实能走通。但是不懂上述的一步法为什么不行,是天然不支持这么做,还是我哪里做得不对?
此外,我也注意到了之前类似的问题。
https://developer.aliyun.com/ask/442766?spm=a2c6h.14164896.0.0.46ca7c2cAa4OjY
https://developer.aliyun.com/ask/367408?spm=a2c6h.14164896.0.0.4d917c2cOsEKvd
我认为这个问题需要被解决,希望大神们知无不言言无不尽。
根据报错信息,可以看出问题在于 Table sink 'hive_catalog.test.hive_test_a' doesn't support consuming update and delete changes,即 Hive 的表不支持接收更新和删除操作。而 Flink CDC 产生的是流式的增删改变化数据,因此需要对其进行处理后再写入 Hive 中。
你所说的两步法,将 Flink CDC 产生的增删改数据先写入 Kafka 中,再从 Kafka 中读取数据,进行格式转换后写入 Hive 中,是一种常见的解决方案。通过 Kafka,可以将 Flink CDC 产生的数据格式转换为 Hive 支持的格式,同时还可以实现数据缓存、数据去重等功能。因此,这种方式是比较可靠的,也是比较常见的解决方案。
如果你想尝试直接将 Flink CDC 产生的数据写入 Hive,可以考虑使用 Flink 的 Table API 或者 DataStream API 进行开发。在代码中,你需要对 Flink CDC 产生的数据进行格式转换,转换为 Hive 支持的格式,然后再将数据写入 Hive。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。