Flink - 读取 Parquet 文件 By Scala / Java

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: parquet 文件常见与 Flink、Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面介绍 Flink 场景下如何读取 Parquet。

一.引言

parquet 文件常见与 Flink、Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面介绍 Flink 场景下如何读取 Parquet。Parquet 相关知识可以参考:Spark - 一文搞懂 parquet

image.gif编辑

二.Parquet Read By Scala

1.依赖准备与环境初始化

import org.apache.hadoop.fs.FileSystem
import org.apache.flink.formats.parquet.ParquetRowInputFormat
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{MessageType, PrimitiveType, Type}

image.gif

Flink 读取 parquet 除了正常 Flink 环境相关依赖外,还需要加载单独的 Parquet 组件:

<dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>

image.gif

本文基于 Flink-1.13.1 + scala-2.12.8 + hadoop-2.6.0 的运行环境,不同版本下可能需要更换上述 parquet 相关依赖。下面初始化 Flink ExecutionEnvironment,因为流式处理的原因,这里初始化环境类型为 Stream:

val env = StreamExecutionEnvironment.getExecutionEnvironment

image.gif

2.推断 Schem 读取 Parquet

parquet 通过列式存储数据,所以需要 schema 标定每一列的数据类型与名称,与 Spark 类似, Flink 也可以通过 Parquet 文件推断其对应 schema 并读取 Parquet。

def readParquetWithInferSchema(env: StreamExecutionEnvironment): Unit = {
    val filePath = "./test.parquet"
    val configuration = new org.apache.hadoop.conf.Configuration(true)
    val parquetFileReader = ParquetFileReader.readFooter(configuration, new org.apache.hadoop.fs.Path(filePath))
    val schema: MessageType = parquetFileReader.getFileMetaData.getSchema
    println(s"Schema: $schema")
    val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(configuration)
    val rowData = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(filePath), schema), filePath).setParallelism(1)
    rowData.map(row => {
      val source = row.getField(1)
      val flag = row.getField(35)
      source + "\t" + flag
    }).setParallelism(1).print()
  }

image.gif

通过 parquetFileReader 获取元数据 MetaData 并获取 parquet 对应 schema,最终通过 env.readFile 方法指定 InputFormat 为 ParquetRowInputFormat 读取 parquet 文件,先看一下打印出来的 schema 形式:

image.gif编辑

由于读取的 parquet 为 SparkSession 生成,所以列名采用了 Spark 的默认形式 _c1,_c2 ...

env.execute("ReadParquet")

image.gif

调用执行方法运行上述 print demo 打印最终结果。

Tips:

这里的 Row 类型为 org.apache.flink.types.Row 而不再是 org.apache.spark.sql.Row,获取元素的方法也不再是 row.getString 或其他,而是采用 getFiled 传入 position 或者 列名 得到,索引从 0 开始。

image.gif编辑

3.指定 schema 读取 Parquet

除了 infer 推理得到 schema 外,读取也支持自定义 schema,与 spark 类似,这里也提供了 PrimitiveType 指定每一列的数据类型,并合并为 MessageType 得到最终的 schema。

def readParquetWithAssignSchema(env: StreamExecutionEnvironment): Unit = {
    val filePath = "./test.parquet"
    val id = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c0")
    val source = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c1")
    val flag = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c35")
    val typeArray = Array(id, source, flag)
    val typeListAsJava = java.util.Arrays.asList(typeArray: _*).asInstanceOf[java.util.List[Type]]
    val schema = new MessageType("schema", typeListAsJava)
    println(schema)
    val rowData = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(filePath), schema), filePath).setParallelism(1)
    rowData.map(row => {
      val source = row.getField(1)
      val flag = row.getField(2)
      source + "\t" + flag
    }).setParallelism(1).print()
  }

image.gif

上面读取的 test.parquet 有 40+ col,这里只读取第 1,2,35 列,所以单独指定 id,source,flag 三列生成 PrimitiveType 并添加至 MessageType 形成 schema,由于 MessageType 为 Java 参数,所以需要通过 asList + asInstance 进行转化,看一下当前的 schema 情况:

image.gif编辑

env.execute("ReadParquet")

image.gif

调用执行方法执行上述 print 逻辑即可。

Tips:

这里列名给出了 _c0, _c1,_c35,但是读取是 position 索引只能选取 0,1,2,因为 schema 数量决定了读取 Row 的列数,而 schema 的列名决定了读取的内容,在该 schema 基础下读取 getField(35) 会报数组越界  java.lang.ArrayIndexOutOfBoundsException:

image.gif编辑

三. Parquet Read By Java

java 读取与 scala 大同小异,主要差别是 map 变为 MapFunction,这里直接贴完整函数方法:

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.formats.parquet.ParquetRowInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
/**
 * @title: ReadParquetByJava
 * @Author DDD
 * @Date: 2022/7/21 8:36 上午
 * @Version 1.0
 */
public class ReadParquetByJava {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String path = "./test.parquet";
        Configuration configuration = new org.apache.hadoop.conf.Configuration(true);
        FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(configuration);
        ParquetMetadata parquetFileReader = ParquetFileReader.readFooter(configuration, new org.apache.hadoop.fs.Path(path));
        MessageType schema = parquetFileReader.getFileMetaData().getSchema();
        System.out.println("-----Schema-----");
        System.out.println(schema);
        env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path)
                .setParallelism(1)
                .map(new MapFunction<Row, String>() {
                    @Override
                    public String map(Row row) throws Exception {
                        try {
                            String source = String.valueOf(row.getField(1));
                            String flag = String.valueOf(row.getField(35));
                            return source + "\t" + flag;
                        } catch (Exception e) {
                            e.printStackTrace();
                            return null;
                        }
                    }
                }).print();
        env.execute("ReadParquetByJava");
    }
}

image.gif

四.总结

Parquet 通过其列式存储与空间压缩应用于多种大数据场景,上面给出了 parquet 文件转 DataStream 的两种方式,同理也可以使用 DataSet 加载为静态数据,上面两个方法都给出了 hdfs: FileSystem 变量但都没有使用,下面说下使用场景:

一般分布式任务读取时对应的 parquet 文件不是一个而是多个,所以需要从目标目录中找出第一个合法的 parquet 文件供 ParquetFileReader 解析对应的 schema,hdfs 的任务就是通过目标路径获取第一个合法文件使用。

def getFirstFilePath(hdfsPath: String, hdfs: FileSystem): String = {
    val files = hdfs.listFiles(new org.apache.hadoop.fs.Path(hdfsPath), false)
    var flag = true
    var firstFile = ""
    while (flag) {
      if (files.hasNext) {
        firstFile = files.next().getPath.getName
        if (!firstFile.equalsIgnoreCase(s"_SUCCESS")
          && !firstFile.startsWith(".")
          && firstFile.endsWith(".parquet")) {
          flag = false
        }
      } else {
        flag = false
      }
    }
    hdfsPath + "/" + firstFile
  }

image.gif

合法的判断需要三个条件:

A.不包含 _SUCCESS

B.不以 '.' 开头

C.以 '.parquet' 结尾

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
Java
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
84 9
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
5天前
|
人工智能 自然语言处理 Java
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
FastExcel 是一款基于 Java 的高性能 Excel 处理工具,专注于优化大规模数据处理,提供简洁易用的 API 和流式操作能力,支持从 EasyExcel 无缝迁移。
51 9
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
|
26天前
|
Java
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
java实现从HDFS上下载文件及文件夹的功能,以流形式输出,便于用户自定义保存任何路径下
86 34
|
2月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
45 3
|
2月前
|
Java 测试技术 Maven
Maven clean 提示文件 java.io.IOException
在使用Maven进行项目打包时,遇到了`Failed to delete`错误,尝试手动删除目标文件也失败,提示`java.io.IOException`。经过分析,发现问题是由于`sys-info.log`文件被其他进程占用。解决方法是关闭IDEA和相关Java进程,清理隐藏的Java进程后重新尝试Maven clean操作。最终问题得以解决。总结:遇到此类问题时,可以通过任务管理器清理相关进程或重启电脑来解决。
|
2月前
|
存储 缓存 安全
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见。本文介绍了使用 `File.createTempFile` 方法和自定义创建临时文件的两种方式,详细探讨了它们的使用场景和注意事项,包括数据缓存、文件上传下载和日志记录等。强调了清理临时文件、确保文件名唯一性和合理设置文件权限的重要性。
129 2
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
2月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1378 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
4天前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。