c. 从HDFS上读Avro文件完整代码
package com.shaonaiyi.hadoop.filetype.avro; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import java.io.IOException; import java.util.List; import java.util.function.Consumer; /** * @Author shaonaiyi@163.com * @Date 2019/12/17 17:29 * @Description 编码实现从HDFS上读Avro文件 */ public class MRAvroFileReader { public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException { //1 构建一个job实例 Configuration hadoopConf = new Configuration(); Job job = Job.getInstance(hadoopConf); //2 设置需要读取的文件全路径 FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/avro"); //3 获取读取文件的格式 // TextInputFormat inputFormat = TextInputFormat.class.newInstance(); AvroKeyInputFormat inputFormat = AvroKeyInputFormat.class.newInstance(); //4 获取需要读取文件的数据块的分区信息 //4.1 获取文件被分成多少数据块了 JobID jobID = new JobID("jobId", 123); JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID); List<InputSplit> inputSplits = inputFormat.getSplits(jobContext); //读取每一个数据块的数据 inputSplits.forEach(new Consumer<InputSplit>() { @Override public void accept(InputSplit inputSplit) { TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0); TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId); // RecordReader reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext); RecordReader<AvroKey<Person>, NullWritable> reader = null; try { // reader.initialize(inputSplit, hadoopAttemptContext); // System.out.println("<key,value>"); // System.out.println("-----------"); // while (reader.nextKeyValue()) { // System.out.println("<"+reader.getCurrentKey() + "," + reader.getCurrentValue()+ ">" ); // } reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext); reader.initialize(inputSplit, hadoopAttemptContext); while (reader.nextKeyValue()) { Person person = reader.getCurrentKey().datum(); System.out.println("key=>" + person); System.out.println("value=>" + reader.getCurrentValue()); } reader.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
与读Text格式(文章链接跳转:Hadoop支持的文件格式之Text)时类似,主要不同如下:
4. 查看读写Avro文件结果(HDFS)
a. 写文件结果
b. 读文件结果,我们在代码里没有设置值