Hadoop支持的文件格式之Parquet(下)

简介: Hadoop支持的文件格式之Parquet(下)

c. 完整的读Parquet文件代码(HDFS)

package com.shaonaiyi.hadoop.filetype.parquet;
import com.shaonaiyi.hadoop.filetype.avro.Person;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.parquet.avro.AvroParquetInputFormat;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 10:52
 * @Description 编码实现读Parquet文件(HDFS)
 */
public class MRAvroParquetFileReader {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置需要读取的文件全路径
        FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet");
        //3 获取读取文件的格式
        AvroParquetInputFormat inputFormat = AvroParquetInputFormat.class.newInstance();
        AvroParquetInputFormat.setAvroReadSchema(job, Person.SCHEMA$);
        //AvroJob.setInputKeySchema(job, Person.SCHEMA$);
        //4 获取需要读取文件的数据块的分区信息
        //4.1 获取文件被分成多少数据块了
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        List<InputSplit> inputSplits = inputFormat.getSplits(jobContext);
        //读取每一个数据块的数据
        inputSplits.forEach(new Consumer<InputSplit>() {
            @Override
            public void accept(InputSplit inputSplit) {
                TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0);
                TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
                RecordReader<NullWritable, Person> reader = null;
                try {
                    reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
                    reader.initialize(inputSplit, hadoopAttemptContext);
                    while (reader.nextKeyValue()) {
                        System.out.println(reader.getCurrentKey());
                        Person person = reader.getCurrentValue();
                        System.out.println(person);
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}


4. 查看读写Parquet文件(HDFS)结果

a. 写Parquet文件(HDFS)

微信图片_20220618231223.png


b. 读Parquet文件(HDFS),Key没有设置值

image.png


0x03 彩蛋


  1. 编写读写Parquet文件Demo


package com.shaonaiyi.hadoop.filetype.parquet;
import com.shaonaiyi.hadoop.filetype.avro.Person;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 11:11
 * @Description 编写读写Parquet文件Demo
 */
public class AvroParquetDemo {
    public static void main(String[] args) throws IOException {
        Person person = new Person();
        person.setName("shaonaiyi");
        person.setAge(18);
        person.setFavoriteNumber(7);
        person.setFavoriteColor("red");
        Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet2");
        ParquetWriter<Object> writer = AvroParquetWriter.builder(path)
                .withSchema(Person.SCHEMA$)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .build();
        writer.write(person);
        writer.close();
        ParquetReader<Object> avroParquetReader = AvroParquetReader.builder(path).build();
        Person record = (Person)avroParquetReader.read();
        System.out.println("name:" + record.getName());
        System.out.println("age:" + record.get("age").toString());
        System.out.println("favorite_number:" + record.get("favorite_number").toString());
        System.out.println("favorite_color:" + record.get("favorite_color"));
    }
}


控制台可以读出文件


image.png


HDFS上也有数据了


image.png


0xFF 总结


  1. 在MapReduce作业中如何使用:
    job.setInputFormatClass(AvroParquetInputFormat.class);
    AvroParquetInputFormat.setAvroReadSchema(job, Person.SCHEMA$);
    job.setOutputFormatClass(ParquetOutputFormat.class);
    AvroParquetOutputFormat.setSchema(job, Person.SCHEMA$);


文章:网站用户行为分析项目之会话切割(二) 中 9. 保存统计结果 时就是以Parquet的格式保存下来的。

Hadoop支持的文件格式系列:

Hadoop支持的文件格式之Text

Hadoop支持的文件格式之Avro

Hadoop支持的文件格式之Parquet

Hadoop支持的文件格式之SequenceFile


相关文章
|
分布式计算 大数据 Hadoop
【大数据开发技术】实验03-Hadoop读取文件
【大数据开发技术】实验03-Hadoop读取文件
235 0
|
3月前
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
52 3
|
3月前
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
51 2
|
3月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
48 1
|
3月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
60 1
|
6月前
|
分布式计算 Hadoop Java
Hadoop编辑hadoop-env.sh文件
【7月更文挑战第19天】
374 5
|
6月前
|
分布式计算 Hadoop Shell
Hadoop修改bashrc或profile文件
【7月更文挑战第16天】
187 2
|
6月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7月前
|
存储 分布式计算 NoSQL
|
7月前
|
数据采集 SQL 分布式计算

相关实验场景

更多