c. 完整的读Parquet文件代码(HDFS)
package com.shaonaiyi.hadoop.filetype.parquet; import com.shaonaiyi.hadoop.filetype.avro.Person; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.parquet.avro.AvroParquetInputFormat; import java.io.IOException; import java.util.List; import java.util.function.Consumer; /** * @Author shaonaiyi@163.com * @Date 2019/12/18 10:52 * @Description 编码实现读Parquet文件(HDFS) */ public class MRAvroParquetFileReader { public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException { //1 构建一个job实例 Configuration hadoopConf = new Configuration(); Job job = Job.getInstance(hadoopConf); //2 设置需要读取的文件全路径 FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet"); //3 获取读取文件的格式 AvroParquetInputFormat inputFormat = AvroParquetInputFormat.class.newInstance(); AvroParquetInputFormat.setAvroReadSchema(job, Person.SCHEMA$); //AvroJob.setInputKeySchema(job, Person.SCHEMA$); //4 获取需要读取文件的数据块的分区信息 //4.1 获取文件被分成多少数据块了 JobID jobID = new JobID("jobId", 123); JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID); List<InputSplit> inputSplits = inputFormat.getSplits(jobContext); //读取每一个数据块的数据 inputSplits.forEach(new Consumer<InputSplit>() { @Override public void accept(InputSplit inputSplit) { TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0); TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId); RecordReader<NullWritable, Person> reader = null; try { reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext); reader.initialize(inputSplit, hadoopAttemptContext); while (reader.nextKeyValue()) { System.out.println(reader.getCurrentKey()); Person person = reader.getCurrentValue(); System.out.println(person); } reader.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
4. 查看读写Parquet文件(HDFS)结果
a. 写Parquet文件(HDFS)
b. 读Parquet文件(HDFS),Key没有设置值
0x03 彩蛋
- 编写读写Parquet文件Demo
package com.shaonaiyi.hadoop.filetype.parquet; import com.shaonaiyi.hadoop.filetype.avro.Person; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; /** * @Author shaonaiyi@163.com * @Date 2019/12/18 11:11 * @Description 编写读写Parquet文件Demo */ public class AvroParquetDemo { public static void main(String[] args) throws IOException { Person person = new Person(); person.setName("shaonaiyi"); person.setAge(18); person.setFavoriteNumber(7); person.setFavoriteColor("red"); Path path = new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro-parquet2"); ParquetWriter<Object> writer = AvroParquetWriter.builder(path) .withSchema(Person.SCHEMA$) .withCompressionCodec(CompressionCodecName.SNAPPY) .build(); writer.write(person); writer.close(); ParquetReader<Object> avroParquetReader = AvroParquetReader.builder(path).build(); Person record = (Person)avroParquetReader.read(); System.out.println("name:" + record.getName()); System.out.println("age:" + record.get("age").toString()); System.out.println("favorite_number:" + record.get("favorite_number").toString()); System.out.println("favorite_color:" + record.get("favorite_color")); } }
控制台可以读出文件
HDFS上也有数据了
0xFF 总结
- 在MapReduce作业中如何使用:
job.setInputFormatClass(AvroParquetInputFormat.class); AvroParquetInputFormat.setAvroReadSchema(job, Person.SCHEMA$); job.setOutputFormatClass(ParquetOutputFormat.class); AvroParquetOutputFormat.setSchema(job, Person.SCHEMA$);
文章:网站用户行为分析项目之会话切割(二) 中 9. 保存统计结果 时就是以Parquet的格式保存下来的。
Hadoop支持的文件格式系列:
Hadoop支持的文件格式之Text
Hadoop支持的文件格式之Avro
Hadoop支持的文件格式之Parquet
Hadoop支持的文件格式之SequenceFile