开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink读取hive表数据(catalog形式)。怎么转换为会报错?

flink读取hive表数据(catalog形式)。怎么转换为会报错?启动命令是yarn-per-job
6fc5de2a3eb29d5354ab04aa76cad292.png
d19e2863f4457235c3edfd1f3a22024e.png

展开
收起
三分钟热度的鱼 2023-11-08 21:18:27 183 0
2 条回答
写回答
取消 提交回答
  • 在 Flink 1.11 中,使用 yarn-per-job 模式运行读取 Hive 表数据的任务时可能会遇到一些问题。主要原因是 yarn-per-job 模式的执行方式与 Flink on YARN(cluster 模式)不同,YARN per job 模式实际上是一个独立的进程,无法像 Flink cluster 模式那样直接连接到 Hadoop 集群上的 Hive Metastore。

    解决方法如下:

    1. 尝试将 Flink 的模式更改为 standalone 或者 cluster 模式。在这种模式下,Flink 可以直接连接到 Hive Metastore 并获取表结构信息。

    2. 如果必须要使用 yarn-per-job 模式,则需要单独启动 Hive Metastore 服务,并将其配置到 Flink 作业中。具体步骤如下:
      a. 在 YARN 集群上启动 Hive Metastore 服务。
      b. 在 Flink conf 中添加 hive.metastore.uris 参数,指向启动的 Hive Metastore 服务地址。
      c. 在代码中使用 catalog 方式读取 Hive 表。

    请参考以下示例:

    // 获取 Hive Catalog 实例
    HiveCatalog hiveCatalog = new HiveCatalog(hiveMetastoreURI, "default");
    
    // 创建 TableEnvironment 并注册 Hive Catalog
    TableEnvironment tableEnv = ...;
    tableEnv.registerCatalog("my_hive_catalog", hiveCatalog);
    tableEnv.useCatalog("my_hive_catalog");
    
    // 使用已注册的 Hive 表名
    Table hiveTable = tableEnv.from("my_hive_table");
    

    这里假设 Hive Metastore 地址为 $hiveMetastoreURI$,默认数据库名为 default。

    2023-11-09 22:04:57
    赞同 1 展开评论 打赏
  • Flink读取Hive表数据(catalog形式)时,如果遇到错误,可以尝试以下方法解决:

    1. 确保Hive Metastore服务已启动。可以通过以下命令检查Hive Metastore服务状态:
    hive --service metastore
    
    1. 在Flink配置文件flink-conf.yaml中,添加以下配置:
    env.java.opts: "-Dlog4j.configuration=file:/path/to/your/log4j.properties"
    

    /path/to/your/log4j.properties替换为实际的log4j配置文件路径。

    1. 在Flink配置文件flink-conf.yaml中,添加以下配置:
    table.execution.arrow.enabled: false
    table.execution.arrow.max-records-per-batch: 1000
    table.execution.arrow.max-memory-consumed-per-batch: 100M
    

    这些配置用于控制Arrow格式的数据读写性能。

    1. 如果仍然遇到问题,可以尝试使用yarn-session模式运行Flink作业,而不是yarn-per-job模式。在启动Flink作业时,使用以下命令:
    ./bin/flink run -m yarn-session /path/to/your/flink-job.jar
    

    /path/to/your/flink-job.jar替换为实际的Flink作业JAR文件路径。

    2023-11-09 10:09:54
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Hive Bucketing in Apache Spark 立即下载
    spark替代HIVE实现ETL作业 立即下载
    2019大数据技术公开课第五季—Hive迁移到MaxCompute最佳实践 立即下载