Flink - 读取 Parquet 文件 By Scala / Java

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: parquet 文件常见与 Flink、Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面介绍 Flink 场景下如何读取 Parquet。

一.引言

parquet 文件常见与 Flink、Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面介绍 Flink 场景下如何读取 Parquet。Parquet 相关知识可以参考:Spark - 一文搞懂 parquet

image.gif编辑

二.Parquet Read By Scala

1.依赖准备与环境初始化

import org.apache.hadoop.fs.FileSystem
import org.apache.flink.formats.parquet.ParquetRowInputFormat
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{MessageType, PrimitiveType, Type}

image.gif

Flink 读取 parquet 除了正常 Flink 环境相关依赖外,还需要加载单独的 Parquet 组件:

<dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>

image.gif

本文基于 Flink-1.13.1 + scala-2.12.8 + hadoop-2.6.0 的运行环境,不同版本下可能需要更换上述 parquet 相关依赖。下面初始化 Flink ExecutionEnvironment,因为流式处理的原因,这里初始化环境类型为 Stream:

val env = StreamExecutionEnvironment.getExecutionEnvironment

image.gif

2.推断 Schem 读取 Parquet

parquet 通过列式存储数据,所以需要 schema 标定每一列的数据类型与名称,与 Spark 类似, Flink 也可以通过 Parquet 文件推断其对应 schema 并读取 Parquet。

def readParquetWithInferSchema(env: StreamExecutionEnvironment): Unit = {
    val filePath = "./test.parquet"
    val configuration = new org.apache.hadoop.conf.Configuration(true)
    val parquetFileReader = ParquetFileReader.readFooter(configuration, new org.apache.hadoop.fs.Path(filePath))
    val schema: MessageType = parquetFileReader.getFileMetaData.getSchema
    println(s"Schema: $schema")
    val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(configuration)
    val rowData = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(filePath), schema), filePath).setParallelism(1)
    rowData.map(row => {
      val source = row.getField(1)
      val flag = row.getField(35)
      source + "\t" + flag
    }).setParallelism(1).print()
  }

image.gif

通过 parquetFileReader 获取元数据 MetaData 并获取 parquet 对应 schema,最终通过 env.readFile 方法指定 InputFormat 为 ParquetRowInputFormat 读取 parquet 文件,先看一下打印出来的 schema 形式:

image.gif编辑

由于读取的 parquet 为 SparkSession 生成,所以列名采用了 Spark 的默认形式 _c1,_c2 ...

env.execute("ReadParquet")

image.gif

调用执行方法运行上述 print demo 打印最终结果。

Tips:

这里的 Row 类型为 org.apache.flink.types.Row 而不再是 org.apache.spark.sql.Row,获取元素的方法也不再是 row.getString 或其他,而是采用 getFiled 传入 position 或者 列名 得到,索引从 0 开始。

image.gif编辑

3.指定 schema 读取 Parquet

除了 infer 推理得到 schema 外,读取也支持自定义 schema,与 spark 类似,这里也提供了 PrimitiveType 指定每一列的数据类型,并合并为 MessageType 得到最终的 schema。

def readParquetWithAssignSchema(env: StreamExecutionEnvironment): Unit = {
    val filePath = "./test.parquet"
    val id = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c0")
    val source = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c1")
    val flag = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c35")
    val typeArray = Array(id, source, flag)
    val typeListAsJava = java.util.Arrays.asList(typeArray: _*).asInstanceOf[java.util.List[Type]]
    val schema = new MessageType("schema", typeListAsJava)
    println(schema)
    val rowData = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(filePath), schema), filePath).setParallelism(1)
    rowData.map(row => {
      val source = row.getField(1)
      val flag = row.getField(2)
      source + "\t" + flag
    }).setParallelism(1).print()
  }

image.gif

上面读取的 test.parquet 有 40+ col,这里只读取第 1,2,35 列,所以单独指定 id,source,flag 三列生成 PrimitiveType 并添加至 MessageType 形成 schema,由于 MessageType 为 Java 参数,所以需要通过 asList + asInstance 进行转化,看一下当前的 schema 情况:

image.gif编辑

env.execute("ReadParquet")

image.gif

调用执行方法执行上述 print 逻辑即可。

Tips:

这里列名给出了 _c0, _c1,_c35,但是读取是 position 索引只能选取 0,1,2,因为 schema 数量决定了读取 Row 的列数,而 schema 的列名决定了读取的内容,在该 schema 基础下读取 getField(35) 会报数组越界  java.lang.ArrayIndexOutOfBoundsException:

image.gif编辑

三. Parquet Read By Java

java 读取与 scala 大同小异,主要差别是 map 变为 MapFunction,这里直接贴完整函数方法:

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.formats.parquet.ParquetRowInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
/**
 * @title: ReadParquetByJava
 * @Author DDD
 * @Date: 2022/7/21 8:36 上午
 * @Version 1.0
 */
public class ReadParquetByJava {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String path = "./test.parquet";
        Configuration configuration = new org.apache.hadoop.conf.Configuration(true);
        FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(configuration);
        ParquetMetadata parquetFileReader = ParquetFileReader.readFooter(configuration, new org.apache.hadoop.fs.Path(path));
        MessageType schema = parquetFileReader.getFileMetaData().getSchema();
        System.out.println("-----Schema-----");
        System.out.println(schema);
        env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path)
                .setParallelism(1)
                .map(new MapFunction<Row, String>() {
                    @Override
                    public String map(Row row) throws Exception {
                        try {
                            String source = String.valueOf(row.getField(1));
                            String flag = String.valueOf(row.getField(35));
                            return source + "\t" + flag;
                        } catch (Exception e) {
                            e.printStackTrace();
                            return null;
                        }
                    }
                }).print();
        env.execute("ReadParquetByJava");
    }
}

image.gif

四.总结

Parquet 通过其列式存储与空间压缩应用于多种大数据场景,上面给出了 parquet 文件转 DataStream 的两种方式,同理也可以使用 DataSet 加载为静态数据,上面两个方法都给出了 hdfs: FileSystem 变量但都没有使用,下面说下使用场景:

一般分布式任务读取时对应的 parquet 文件不是一个而是多个,所以需要从目标目录中找出第一个合法的 parquet 文件供 ParquetFileReader 解析对应的 schema,hdfs 的任务就是通过目标路径获取第一个合法文件使用。

def getFirstFilePath(hdfsPath: String, hdfs: FileSystem): String = {
    val files = hdfs.listFiles(new org.apache.hadoop.fs.Path(hdfsPath), false)
    var flag = true
    var firstFile = ""
    while (flag) {
      if (files.hasNext) {
        firstFile = files.next().getPath.getName
        if (!firstFile.equalsIgnoreCase(s"_SUCCESS")
          && !firstFile.startsWith(".")
          && firstFile.endsWith(".parquet")) {
          flag = false
        }
      } else {
        flag = false
      }
    }
    hdfsPath + "/" + firstFile
  }

image.gif

合法的判断需要三个条件:

A.不包含 _SUCCESS

B.不以 '.' 开头

C.以 '.parquet' 结尾

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
1月前
|
Java Unix Go
【Java】(8)Stream流、文件File相关操作,IO的含义与运用
Java 为 I/O 提供了强大的而灵活的支持,使其更广泛地应用到文件传输和网络编程中。!但本节讲述最基本的和流与 I/O 相关的功能。我们将通过一个个例子来学习这些功能。
161 3
|
4月前
|
监控 Java API
Java语言按文件创建日期排序及获取最新文件的技术
这段代码实现了文件创建时间的读取、文件列表的获取与排序以及获取最新文件的需求。它具备良好的效率和可读性,对于绝大多数处理文件属性相关的需求来说足够健壮。在实际应用中,根据具体情况,可能还需要进一步处理如访问权限不足、文件系统不支持某些属性等边界情况。
253 14
|
4月前
|
存储 Java 编译器
深入理解Java虚拟机--类文件结构
本内容介绍了Java虚拟机与Class文件的关系及其内部结构。Class文件是一种与语言无关的二进制格式,包含JVM指令集、符号表等信息。无论使用何种语言,只要能生成符合规范的Class文件,即可在JVM上运行。文章详细解析了Class文件的组成,包括魔数、版本号、常量池、访问标志、类索引、字段表、方法表和属性表等,并说明其在Java编译与运行过程中的作用。
135 0
|
4月前
|
存储 人工智能 Java
java之通过Http下载文件
本文介绍了使用Java实现通过文件链接下载文件到本地的方法,主要涉及URL、HttpURLConnection及输入输出流的操作。
302 0
|
5月前
|
存储 Java 数据安全/隐私保护
Java技术栈揭秘:Base64加密和解密文件的实战案例
以上就是我们今天关于Java实现Base64编码和解码的实战案例介绍。希望能对你有所帮助。还有更多知识等待你去探索和学习,让我们一同努力,继续前行!
467 5
|
5月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
334 0
|
5月前
|
网络协议 安全 Java
实现Java语言的文件断点续传功能的技术方案。
像这样,我们就完成了一项看似高科技、实则亲民的小工程。这样的技术实现不仅具备实用性,也能在面对网络不稳定的挑战时,稳稳地、不失乐趣地完成工作。
331 0
|
11月前
|
人工智能 自然语言处理 Java
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
FastExcel 是一款基于 Java 的高性能 Excel 处理工具,专注于优化大规模数据处理,提供简洁易用的 API 和流式操作能力,支持从 EasyExcel 无缝迁移。
2514 65
FastExcel:开源的 JAVA 解析 Excel 工具,集成 AI 通过自然语言处理 Excel 文件,完全兼容 EasyExcel
|
8月前
|
前端开发 Cloud Native Java
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
|
9月前
|
存储 算法 Java
解锁“分享文件”高效密码:探秘 Java 二叉搜索树算法
在信息爆炸的时代,文件分享至关重要。二叉搜索树(BST)以其高效的查找性能,为文件分享优化提供了新路径。本文聚焦Java环境下BST的应用,介绍其基础结构、实现示例及进阶优化。BST通过有序节点快速定位文件,结合自平衡树、多线程和权限管理,大幅提升文件分享效率与安全性。代码示例展示了文件插入与查找的基本操作,适用于大规模并发场景,确保分享过程流畅高效。掌握BST算法,助力文件分享创新发展。
下一篇
oss云网关配置