Hadoop支持的文件格式之SequenceFile(上)

简介: Hadoop支持的文件格式之SequenceFile(上)

0x00 文章内容


Hadoop支持的四种常用的文件格式:Text(csv)ParquetAvro以及SequenceFile,非常关键!


0x01 SequenceFile格式概念


1. SequenceFile是啥

二进制格式。


0x02 编码实现


1. 写文件完整代码
package com.shaonaiyi.hadoop.filetype.sequence;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.net.URI;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/20 11:27
 * @Description Hadoop支持的文件格式之写Sequence
 */
public class SequenceFileWriter {
    private static final String[] DATA = {
            "shao, naiyi, bigdata, hadoop",
            "naiyi, bigdata, spark",
            "yi, two, a good man"
    };
    public static void main(String[] args) throws IOException {
        String uri = "hdfs://master:9999/user/hadoop-sny/mr/filetype/sequence.seq";
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), configuration);
        Path path = new Path(uri);
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            writer = SequenceFile.createWriter(configuration,
                    SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(key.getClass()),
                    SequenceFile.Writer.valueClass(value.getClass()));
            for (int i = 0; i < 100; i++) {
                key.set(100 -i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                writer.append(key, value);
            }
        } finally {
            writer.close();
        }
    }
}


代码解读:根据配置文件、文件路径、key类型、value类型此四个参数构建SequenceFile的Writer对象,然后循环append进key和value

2. 读文件完整代码
package com.shaonaiyi.hadoop.filetype.sequence;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.net.URI;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/20 11:28
 * @Description Hadoop支持的文件格式之读Sequence
 */
public class SequenceFileReader {
    public static void main(String[] args) throws IOException {
        String uri = "hdfs://master:9999/user/hadoop-sny/mr/filetype/sequence.seq";
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), configuration);
        Path path = new Path(uri);
        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(configuration, SequenceFile.Reader.file(path));
            Writable key = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), configuration);
            Writable value = (Writable)ReflectionUtils.newInstance(reader.getValueClass(), configuration);
            long position = reader.getPosition();
            while (reader.next(key, value)) {
                String syncSeen = reader.syncSeen() ? "*" : "";
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
                position = reader.getPosition();
            }
        } finally {
            reader.close();
        }
    }
}


3. 写文件完整代码(HDFS
package com.shaonaiyi.hadoop.filetype.sequence;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/20 12:53
 * @Description Hadoop支持的文件格式之写Sequence(HDFS)
 */
public class MRSequenceFileWriter {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置job的相关属性
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        //3 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/sequence"));
        //4 构建JobContext
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        //5 构建taskContext
        TaskAttemptID attemptId = new TaskAttemptID("attemptId", 123, TaskType.REDUCE, 0, 0);
        TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
        //6 构建OutputFormat实例
        OutputFormat format = job.getOutputFormatClass().newInstance();
        //7 设置OutputCommitter
        OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext);
        committer.setupJob(jobContext);
        committer.setupTask(hadoopAttemptContext);
        //8 获取writer写数据,写完关闭writer
        RecordWriter<LongWritable, Text> writer = format.getRecordWriter(hadoopAttemptContext);
        String value = "shao";
        writer.write(new LongWritable(System.currentTimeMillis()), new Text(value));
        writer.close(hadoopAttemptContext);
        //9 committer提交job和task
        committer.commitTask(hadoopAttemptContext);
        committer.commitJob(jobContext);
    }
}


4. 读文件完整代码(HDFS)
package com.shaonaiyi.hadoop.filetype.sequence;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/20 14:17
 * @Description Hadoop支持的文件格式之读Sequence(HDFS)
 */
public class MRSequenceFileReader {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置需要读取的文件全路径
        FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/sequence");
        //3 获取读取文件的格式
        SequenceFileInputFormat inputFormat = SequenceFileInputFormat.class.newInstance();
        //4 获取需要读取文件的数据块的分区信息
        //4.1 获取文件被分成多少数据块了
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        List<InputSplit> inputSplits = inputFormat.getSplits(jobContext);
        //读取每一个数据块的数据
        inputSplits.forEach(new Consumer<InputSplit>() {
            @Override
            public void accept(InputSplit inputSplit) {
                TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0);
                TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
                RecordReader<LongWritable, Text> reader = null;
                try {
                    reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
                    reader.initialize(inputSplit, hadoopAttemptContext);
                    while (reader.nextKeyValue()) {
                        System.out.println(reader.getCurrentKey());
                        System.out.println(reader.getCurrentValue());
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
相关文章
|
分布式计算 大数据 Hadoop
【大数据开发技术】实验03-Hadoop读取文件
【大数据开发技术】实验03-Hadoop读取文件
234 0
|
3月前
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
50 3
|
3月前
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
50 2
|
3月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
47 1
|
3月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
59 1
|
6月前
|
分布式计算 Hadoop Java
Hadoop编辑hadoop-env.sh文件
【7月更文挑战第19天】
370 5
|
6月前
|
分布式计算 Hadoop Shell
Hadoop修改bashrc或profile文件
【7月更文挑战第16天】
185 2
|
6月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7月前
|
存储 分布式计算 NoSQL
|
7月前
|
数据采集 SQL 分布式计算

相关实验场景

更多