大佬们帮看下,Flink CDC中这里有啥问题吗?

大佬们帮看下,Flink CDC中这里有啥问题吗?1.source表创建
create database kafka_database;

use kafka_database;

CREATE TABLE kafka_source1 (
test STRING
) WITH (
'connector' = 'kafka',
'topic' = 'ProcessTest',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup',
'properties.bootstrap.servers' = '192.168.110.70:6667',
'format' = 'raw'
);

2.sink表创建通过catalog拉取hive元数据
create catalog myhive with(
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/usr/hdp/current/hive-client/conf'
);

use catalog myhive;

3.数据写入
INSERT INTO default.test_internal_location1

select * from kafka_database.kafka_source1 ;
其实场景很简单,就是用Flink从Kafka消费,写入HDFS,source表存在,已经建好了没问题。
sink端catalog创建同步也没问题,但是后面的数据写入过程一直找不到sink表

1.source表创建
create database kafka_database;

use kafka_database;

CREATE TABLE kafka_source1 (
test STRING
) WITH (
'connector' = 'kafka',
'topic' = 'ProcessTest',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup',
'properties.bootstrap.servers' = '192.168.110.70:6667',
'format' = 'raw'
);

2.sink表创建通过catalog拉取hive元数据
create catalog myhive with(
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/usr/hdp/current/hive-client/conf'
);

use catalog myhive;

3.数据写入
INSERT INTO default.test_internal_location1

select * from kafka_database.kafka_source1 ;那我一直报找不到catalog里的表啊

展开
收起
真的很搞笑 2023-07-13 11:22:44 83 分享 版权
3 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,创建数据库和表是非常常见的操作,但是在您提供的代码中,如果只是创建了数据库,并且没有创建表或者其他的操作,那么并没有明显的问题。不过,如果您想要在 Flink CDC 中使用该数据库,需要确保已经正确设置了数据库连接参数,并且可以成功连接到数据库。一般来说,您需要在 Flink CDC 的配置文件中配置数据库连接信息,例如:

    properties
    Copy

    设置数据源类型为 MySQL

    flink.sources.kafka_database.source.type=mysql-cdc

    设置 MySQL 数据库的连接信息

    flink.sources.kafka_database.source.url=jdbc:mysql://localhost:3306/kafka_database
    flink.sources.kafka_database.source.username=root
    flink.sources.kafka_database.source.password=123456
    在上述配置文件中,我们使用 mysql-cdc 数据源类型来指定使用 MySQL 数据库,并设置了 MySQL 数据库的连接信息,包括 URL、用户名和密码。同时,我们将数据源的名称设置为 kafka_database,以便在其他地方引用该数据源。

    2023-07-30 09:38:13
    赞同 展开评论
  • 根据您提供的信息,有几个可能导致找不到 Catalog 中表的问题:

    1. Catalog 配置:请确保 myhive Catalog 的配置正确,并且指向了正确的 Hive 元数据存储位置。检查所使用的目录路径是否正确,以及是否具有适当的权限访问该目录。

    2. 数据库和表:检查在默认的数据库中是否存在名为 default.test_internal_location1 的表。如果没有,请创建该表,确保表的定义与您的数据写入语句中引用的表一致。

    3. 表引用:在插入数据的 SELECT 语句中,确保使用的是正确的表引用方式。根据您提供的代码,应该使用 kafka_database.kafka_source1 表的完整引用,而不仅仅是 kafka_source1。请确认 kafka_database 数据库中的表名是否正确。

    4. Flink 版本兼容性:确保您使用的 Flink 版本与所使用的 Catalog(Hive)版本兼容。某些 Flink 和 Catalog 版本组合可能会导致兼容性问题。

    5. 日志和错误消息:检查 Flink CDC 任务的日志和错误消息,查看是否有关于找不到表或 Catalog 的详细错误信息。这可能提供更多线索来解决问题。

    如果上述步骤仍然无法解决问题,请考虑以下操作:

    - 确认网络连接:确保 Flink CDC 任务能够连接到 Hive 和 Kafka 的相应服务。检查网络配置,确保能够访问到这些服务。

    - 验证功能性:尝试使用相同的配置和代码在其他环境中运行,以确定是否是特定于当前环境的问题。

    2023-07-29 23:09:27
    赞同 展开评论
  • 可以的。你们就只同步一张表吗,这种遇到修改删除很麻烦的,catalog能正常获取吗,其实场景很简单,就是用Flink从Kafka消费,写入HDFS,source表存在,已经建好了没问题。
    sink端catalog创建同步也没问题,但是后面的数据写入过程一直找不到sink表

    1.source表创建
    create database kafka_database;

    use kafka_database;

    CREATE TABLE kafka_source1 (
    test STRING
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'ProcessTest',
    'scan.startup.mode' = 'earliest-offset',
    'properties.group.id' = 'testGroup',
    'properties.bootstrap.servers' = '192.168.110.70:6667',
    'format' = 'raw'
    );

    2.sink表创建通过catalog拉取hive元数据
    create catalog myhive with(
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/usr/hdp/current/hive-client/conf'
    );

    3.数据写入
    INSERT INTO myhive.default.test_internal_location1

    select * from kafka_database.kafka_source1 ;,此回答整理自钉群“Flink CDC 社区”

    2023-07-13 15:42:57
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理