Hadoop支持的文件格式之Avro(中)

简介: Hadoop支持的文件格式之Avro(中)
2. 查看读写Avro文件结果

a. 写Avro文件

image.png


b. 读Avro文件


image.png


3. 编码实现读写Avro文件(HDFS

a. 引入所需要的jar包


    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-mapred</artifactId>
        <version>1.8.0</version>
    </dependency>


b. 写Avro文件到HDFS完整代码


package com.shaonaiyi.hadoop.filetype.avro;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/17 17:15
 * @Description 编码实现写Avro文件到HDFS
 */
public class MRAvroFileWriter {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置job的相关属性
//        job.setOutputKeyClass(NullWritable.class);
//        job.setOutputValueClass(Text.class);
//        job.setOutputFormatClass(TextOutputFormat.class);
        //job.setOutputKeyClass(AvroKey.class);
        //job.setOutputValueClass(Person.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        //AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
        AvroJob.setOutputKeySchema(job, Person.SCHEMA$);
        //3 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro"));
        //FileOutputFormat.setCompressOutput(job, true);
        //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        //4 构建JobContext
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        //5 构建taskContext
        TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.REDUCE, 0, 0);
        TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
        //6 构建OutputFormat实例
        OutputFormat format = job.getOutputFormatClass().newInstance();
        //7 设置OutputCommitter
        OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext);
        committer.setupJob(jobContext);
        committer.setupTask(hadoopAttemptContext);
        //8 获取writer写数据,写完关闭writer
        RecordWriter<AvroKey, Person> writer = format.getRecordWriter(hadoopAttemptContext);
//        writer.write(null, new Text("shao"));
//        writer.write(null, new Text("nai"));
//        writer.write(null, new Text("yi"));
//        writer.write(null, new Text("bigdata-man"));
        Person person = new Person();
        person.setName("jeffy");
        person.setAge(20);
        person.setFavoriteNumber(10);
        person.setFavoriteColor("red");
        writer.write(new AvroKey(person), null);
        writer.close(hadoopAttemptContext);
        //9 committer提交job和task
        committer.commitTask(hadoopAttemptContext);
        committer.commitJob(jobContext);
    }
}


与写Text格式(文章链接跳转:Hadoop支持的文件格式之Text)时类似,主要不同如下:

微信图片_20220618230326.png


c. 从HDFS上读Avro文件完整代码

package com.shaonaiyi.hadoop.filetype.avro;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/17 17:29
 * @Description 编码实现从HDFS上读Avro文件
 */
public class MRAvroFileReader {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置需要读取的文件全路径
        FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/avro");
        //3 获取读取文件的格式
//        TextInputFormat inputFormat = TextInputFormat.class.newInstance();
        AvroKeyInputFormat inputFormat = AvroKeyInputFormat.class.newInstance();
        //4 获取需要读取文件的数据块的分区信息
        //4.1 获取文件被分成多少数据块了
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        List<InputSplit> inputSplits = inputFormat.getSplits(jobContext);
        //读取每一个数据块的数据
        inputSplits.forEach(new Consumer<InputSplit>() {
            @Override
            public void accept(InputSplit inputSplit) {
                TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0);
                TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
//                RecordReader reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
                RecordReader<AvroKey<Person>, NullWritable> reader = null;
                try {
//                    reader.initialize(inputSplit, hadoopAttemptContext);
//                    System.out.println("<key,value>");
//                    System.out.println("-----------");
//                    while (reader.nextKeyValue()) {
//                        System.out.println("<"+reader.getCurrentKey() + "," + reader.getCurrentValue()+ ">" );
//                    }
                    reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
                    reader.initialize(inputSplit, hadoopAttemptContext);
                    while (reader.nextKeyValue()) {
                        Person person = reader.getCurrentKey().datum();
                        System.out.println("key=>" + person);
                        System.out.println("value=>" + reader.getCurrentValue());
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}



相关文章
|
分布式计算 大数据 Hadoop
【大数据开发技术】实验03-Hadoop读取文件
【大数据开发技术】实验03-Hadoop读取文件
214 0
|
3月前
|
存储 大数据 测试技术
用于大数据分析的数据存储格式:Parquet、Avro 和 ORC 的性能和成本影响
在大数据环境中,数据存储格式直接影响查询性能和成本。本文探讨了 Parquet、Avro 和 ORC 三种格式在 Google Cloud Platform (GCP) 上的表现。Parquet 和 ORC 作为列式存储格式,在压缩和读取效率方面表现优异,尤其适合分析工作负载;Avro 则适用于需要快速写入和架构演化的场景。通过对不同查询类型(如 SELECT、过滤、聚合和联接)的基准测试,本文提供了在各种使用案例中选择最优存储格式的建议。研究结果显示,Parquet 和 ORC 在读取密集型任务中更高效,而 Avro 更适合写入密集型任务。正确选择存储格式有助于显著降低成本并提升查询性能。
503 1
用于大数据分析的数据存储格式:Parquet、Avro 和 ORC 的性能和成本影响
|
2月前
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
45 3
|
2月前
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
41 2
|
2月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
38 1
|
2月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
52 1
|
5月前
|
分布式计算 Hadoop Java
Hadoop编辑hadoop-env.sh文件
【7月更文挑战第19天】
337 5
|
5月前
|
分布式计算 Hadoop Shell
Hadoop修改bashrc或profile文件
【7月更文挑战第16天】
165 2
|
5月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
存储 分布式计算 NoSQL

相关实验场景

更多