0x00 文章内容
- 行存储与列存储
- 编码实现Parquet格式的读写
- 彩蛋
0x01 行存储与列存储
1. Avro与Parquet
a. 请参考文章:Hadoop支持的文件格式之Avro的0x01 行存储与列存储
0x02 编码实现Parquet格式的读写
1. 编码实现读写Parquet文件
a. 引入Parquet相关jar包
<!--添加Parquet依赖--> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-column</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.8.1</version> </dependency>
b. 完整的写Parquet文件代码(写到HDFS)
package com.shaonaiyi.hadoop.filetype.parquet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import java.io.IOException; /** * @Author shaonaiyi@163.com * @Date 2019/12/18 10:14 * @Description 编码实现写Parquet文件 */ public class ParquetFileWriter { public static void main(String[] args) throws IOException { MessageType schema = MessageTypeParser.parseMessageType("message Person {\n" + " required binary name;\n" + " required int32 age;\n" + " required int32 favorite_number;\n" + " required binary favorite_color;\n" + "}"); Configuration configuration = new Configuration(); Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/parquet/data.parquet"); GroupWriteSupport writeSupport = new GroupWriteSupport(); GroupWriteSupport.setSchema(schema, configuration); ParquetWriter<Group> writer = new ParquetWriter<Group>(path, writeSupport, CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetProperties.WriterVersion.PARQUET_1_0, configuration); GroupFactory groupFactory = new SimpleGroupFactory(schema); Group group = groupFactory.newGroup() .append("name", "shaonaiyi") .append("age", 18) .append("favorite_number", 7) .append("favorite_color", "red"); writer.write(group); writer.close(); } }
c. 完整的读Parquet文件代码(从HDFS读)
package com.shaonaiyi.hadoop.filetype.parquet; import org.apache.hadoop.fs.Path; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.example.GroupReadSupport; import java.io.IOException; /** * @Author shaonaiyi@163.com * @Date 2019/12/18 10:18 * @Description 编码实现读Parquet文件 */ public class ParquetFileReader { public static void main(String[] args) throws IOException { Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/parquet/parquet-data.parquet"); GroupReadSupport readSupport = new GroupReadSupport(); ParquetReader<Group> reader = new ParquetReader<>(path, readSupport); Group result = reader.read(); System.out.println("name:" + result.getString("name", 0).toString()); System.out.println("age:" + result.getInteger("age", 0)); System.out.println("favorite_number:" + result.getInteger("favorite_number", 0)); System.out.println("favorite_color:" + result.getString("favorite_color", 0)); } }
2. 查看读写Parquet文件结果
a. 写Parquet文件
b. 读Parquet文件
3. 编码实现读写Parquet文件(HDFS)
a. 引入Parquet与Avro关联的jar包
<dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.8.1</version> </dependency>
从上面的代码我们可以看出,以下面这种方式定义Schema很不友好:
MessageType schema = MessageTypeParser.parseMessageType("message Person {\n" + " required binary name;\n" + " required int32 age;\n" + " required int32 favorite_number;\n" + " required binary favorite_color;\n" + "}");
所以我们可以将Parquet与Avro关联,直接使用Avro的Schema即可。
b. 完整的写Parquet文件代码(HDFS)
package com.shaonaiyi.hadoop.filetype.parquet; import com.shaonaiyi.hadoop.filetype.avro.Person; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.parquet.avro.AvroParquetOutputFormat; import java.io.IOException; /** * @Author shaonaiyi@163.com * @Date 2019/12/18 10:47 * @Description 编码实现写Parquet文件(HDFS) */ public class MRAvroParquetFileWriter { public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException { //1 构建一个job实例 Configuration hadoopConf = new Configuration(); Job job = Job.getInstance(hadoopConf); //2 设置job的相关属性 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Person.class); job.setOutputFormatClass(AvroParquetOutputFormat.class); //AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT)); AvroParquetOutputFormat.setSchema(job, Person.SCHEMA$); //3 设置输出路径 FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet")); //4 构建JobContext JobID jobID = new JobID("jobId", 123); JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID); //5 构建taskContext TaskAttemptID attemptId = new TaskAttemptID("attemptId", 123, TaskType.REDUCE, 0, 0); TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId); //6 构建OutputFormat实例 OutputFormat format = job.getOutputFormatClass().newInstance(); //7 设置OutputCommitter OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext); committer.setupJob(jobContext); committer.setupTask(hadoopAttemptContext); //8 获取writer写数据,写完关闭writer RecordWriter<Void, Person> writer = format.getRecordWriter(hadoopAttemptContext); Person person = new Person(); person.setName("shaonaiyi"); person.setAge(18); person.setFavoriteNumber(7); person.setFavoriteColor("red"); writer.write(null, person); writer.close(hadoopAttemptContext); //9 committer提交job和task committer.commitTask(hadoopAttemptContext); committer.commitJob(jobContext); } }