Hadoop支持的文件格式之Parquet(上)

简介: Hadoop支持的文件格式之Parquet(上)

0x00 文章内容


  1. 行存储与列存储
  2. 编码实现Parquet格式的读写
  3. 彩蛋


0x01 行存储与列存储


1. Avro与Parquet

a. 请参考文章:Hadoop支持的文件格式之Avro0x01 行存储与列存储


0x02 编码实现Parquet格式的读写


1. 编码实现读写Parquet文件

a. 引入Parquet相关jar包

    <!--添加Parquet依赖-->
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-column</artifactId>
        <version>1.8.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-hadoop</artifactId>
        <version>1.8.1</version>
    </dependency>


b. 完整的写Parquet文件代码(写到HDFS

package com.shaonaiyi.hadoop.filetype.parquet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 10:14
 * @Description 编码实现写Parquet文件
 */
public class ParquetFileWriter {
    public static void main(String[] args) throws IOException {
        MessageType schema = MessageTypeParser.parseMessageType("message Person {\n" +
                "    required binary name;\n" +
                "    required int32 age;\n" +
                "    required int32 favorite_number;\n" +
                "    required binary favorite_color;\n" +
                "}");
        Configuration configuration = new Configuration();
        Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/parquet/data.parquet");
        GroupWriteSupport writeSupport = new GroupWriteSupport();
        GroupWriteSupport.setSchema(schema, configuration);
        ParquetWriter<Group> writer = new ParquetWriter<Group>(path, writeSupport,
                CompressionCodecName.SNAPPY,
                ParquetWriter.DEFAULT_BLOCK_SIZE,
                ParquetWriter.DEFAULT_PAGE_SIZE,
                ParquetWriter.DEFAULT_PAGE_SIZE,
                ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
                ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
                ParquetProperties.WriterVersion.PARQUET_1_0, configuration);
        GroupFactory groupFactory = new SimpleGroupFactory(schema);
        Group group = groupFactory.newGroup()
                .append("name", "shaonaiyi")
                .append("age", 18)
                .append("favorite_number", 7)
                .append("favorite_color", "red");
        writer.write(group);
        writer.close();
    }
}


c. 完整的读Parquet文件代码(从HDFS读)

package com.shaonaiyi.hadoop.filetype.parquet;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 10:18
 * @Description 编码实现读Parquet文件
 */
public class ParquetFileReader {
    public static void main(String[] args) throws IOException {
        Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/parquet/parquet-data.parquet");
        GroupReadSupport readSupport = new GroupReadSupport();
        ParquetReader<Group> reader = new ParquetReader<>(path, readSupport);
        Group result = reader.read();
        System.out.println("name:" + result.getString("name", 0).toString());
        System.out.println("age:" + result.getInteger("age", 0));
        System.out.println("favorite_number:" + result.getInteger("favorite_number", 0));
        System.out.println("favorite_color:" + result.getString("favorite_color", 0));
    }
}


2. 查看读写Parquet文件结果

a. 写Parquet文件

image.png


b. 读Parquet文件

image.png


3. 编码实现读写Parquet文件(HDFS)

a. 引入Parquet与Avro关联的jar包

    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.8.1</version>
    </dependency>


从上面的代码我们可以看出,以下面这种方式定义Schema很不友好:

    MessageType schema = MessageTypeParser.parseMessageType("message Person {\n" +
            "    required binary name;\n" +
            "    required int32 age;\n" +
            "    required int32 favorite_number;\n" +
            "    required binary favorite_color;\n" +
            "}");


所以我们可以将Parquet与Avro关联,直接使用Avro的Schema即可。

b. 完整的写Parquet文件代码(HDFS)

package com.shaonaiyi.hadoop.filetype.parquet;
import com.shaonaiyi.hadoop.filetype.avro.Person;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/18 10:47
 * @Description 编码实现写Parquet文件(HDFS)
 */
public class MRAvroParquetFileWriter {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置job的相关属性
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Person.class);
        job.setOutputFormatClass(AvroParquetOutputFormat.class);
        //AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
        AvroParquetOutputFormat.setSchema(job, Person.SCHEMA$);
        //3 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet"));
        //4 构建JobContext
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        //5 构建taskContext
        TaskAttemptID attemptId = new TaskAttemptID("attemptId", 123, TaskType.REDUCE, 0, 0);
        TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
        //6 构建OutputFormat实例
        OutputFormat format = job.getOutputFormatClass().newInstance();
        //7 设置OutputCommitter
        OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext);
        committer.setupJob(jobContext);
        committer.setupTask(hadoopAttemptContext);
        //8 获取writer写数据,写完关闭writer
        RecordWriter<Void, Person> writer = format.getRecordWriter(hadoopAttemptContext);
        Person person = new Person();
        person.setName("shaonaiyi");
        person.setAge(18);
        person.setFavoriteNumber(7);
        person.setFavoriteColor("red");
        writer.write(null, person);
        writer.close(hadoopAttemptContext);
        //9 committer提交job和task
        committer.commitTask(hadoopAttemptContext);
        committer.commitJob(jobContext);
    }
}
相关文章
|
分布式计算 大数据 Hadoop
【大数据开发技术】实验03-Hadoop读取文件
【大数据开发技术】实验03-Hadoop读取文件
200 0
|
1月前
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
40 3
|
1月前
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
40 2
|
1月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
30 1
|
1月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
44 1
|
4月前
|
分布式计算 Hadoop Java
Hadoop编辑hadoop-env.sh文件
【7月更文挑战第19天】
289 5
|
4月前
|
分布式计算 Hadoop Shell
Hadoop修改bashrc或profile文件
【7月更文挑战第16天】
128 2
|
4月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
存储 分布式计算 NoSQL
|
5月前
|
数据采集 SQL 分布式计算

相关实验场景

更多