Hadoop支持的文件格式之Text

简介: Hadoop支持的文件格式之Text

0x00 文章内容


  1. Text格式概念
  2. 编码实现
  3. 校验结果
  4. 可能出现的问题解决

Hadoop支持的四种常用的文件格式:Text(csv)ParquetAvro以及SequenceFile,非常关键!


0x01 Text格式概念


1. Text是啥

普通的文档格式,txt格式。


0x02 编码实现


1. 写文件完整代码
package com.shaonaiyi.hadoop.filetype.text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import java.io.IOException;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/17 11:20
 * @Description Hadoop支持的文件格式之写Text
 */
public class MRTextFileWriter {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置job的相关属性
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        //3 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/text"));
        //FileOutputFormat.setCompressOutput(job, true);
        //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        //4 构建JobContext
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        //5 构建taskContext
        TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.REDUCE, 0, 0);
        TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
        //6 构建OutputFormat实例
        OutputFormat format = job.getOutputFormatClass().newInstance();
        //7 设置OutputCommitter
        OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext);
        committer.setupJob(jobContext);
        committer.setupTask(hadoopAttemptContext);
        //8 获取writer写数据,写完关闭writer
        RecordWriter<NullWritable, Text> writer = format.getRecordWriter(hadoopAttemptContext);
        writer.write(null, new Text("shao"));
        writer.write(null, new Text("nai"));
        writer.write(null, new Text("yi"));
        writer.write(null, new Text("bigdata-man"));
        writer.close(hadoopAttemptContext);
        //9 committer提交job和task
        committer.commitTask(hadoopAttemptContext);
        committer.commitJob(jobContext);
    }
}


2. 读文件完整代码
package com.shaonaiyi.hadoop.filetype.text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
/**
 * @Author shaonaiyi@163.com
 * @Date 2019/12/17 11:38
 * @Description Hadoop支持的文件格式之读Text
 */
public class MRTextFileReader {
    public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException {
        //1 构建一个job实例
        Configuration hadoopConf = new Configuration();
        Job job = Job.getInstance(hadoopConf);
        //2 设置需要读取的文件全路径
        FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/text");
        //3 获取读取文件的格式
        TextInputFormat inputFormat = TextInputFormat.class.newInstance();
        //4 获取需要读取文件的数据块的分区信息
        //4.1 获取文件被分成多少数据块了
        JobID jobID = new JobID("jobId", 123);
        JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID);
        List<InputSplit> inputSplits = inputFormat.getSplits(jobContext);
        //读取每一个数据块的数据
        inputSplits.forEach(new Consumer<InputSplit>() {
            @Override
            public void accept(InputSplit inputSplit) {
                TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0);
                TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId);
                RecordReader reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext);
                try {
                    reader.initialize(inputSplit, hadoopAttemptContext);
                    System.out.println("<key,value>");
                    System.out.println("-----------");
                    while (reader.nextKeyValue()) {
                        System.out.println("<"+reader.getCurrentKey() + "," + reader.getCurrentValue()+ ">" );
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}


0x03 校验结果


1. 启动集群

a. 启动HDFS集群,

start-dfs.sh

PS:如不启动会处于一个卡死状态

Exception in thread "main" java.net.ConnectException: Call From shaonaiyi/192.168.98.205 to master:9999 failed on connection exception: java.net.ConnectException: Connection refused: no further information; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused


2. 执行写Text文件格式代码

a. 直接在Win上执行,如无报错,去集群查看结果:

hadoop fs -ls hdfs://master:9999/user/hadoop-sny/mr/filetype
hadoop fs -cat hdfs://master:9999/user/hadoop-sny/mr/filetype/text/*


image.png


PS:如果报权限错误:

Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=WRITE, inode="/user/hadoop-sny":hadoop-sny:supergroup:drwxr-xr-x


解决方案:需要去集群里修改权限

hadoop fs -mkdir -p hdfs://master:9999/user/hadoop-sny/mr/filetype
hadoop fs -chmod 757 hdfs://master:9999/user/hadoop-sny/mr/filetype


3. 执行读Text文件格式代码

a. 可得如下结果

image.png


System.out.println("<"+reader.getCurrentKey() + "," + reader.getCurrentValue()+ ">" );


reader.getCurrentKey()为偏移量,每一行的起始值

reader.getCurrentValue()为具体的值

与MapReduce单词计数例子时一样。


0x04 可能出现的问题解决


1. 类无法导入

image.png


请参考文章:“Usage of API documented as @since 1.8+”报错的解决办法


0xFF 总结


Hadoop支持的文件格式系列:

Hadoop支持的文件格式之Text

Hadoop支持的文件格式之Avro

Hadoop支持的文件格式之Parquet

Hadoop支持的文件格式之SequenceFile

项目实战中,文章:网站用户行为分析项目之会话切割(二)中使用的存储格式是Parquet。


相关文章
|
8月前
|
分布式计算 大数据 Hadoop
【大数据开发技术】实验03-Hadoop读取文件
【大数据开发技术】实验03-Hadoop读取文件
131 0
|
10天前
|
存储 分布式计算 Hadoop
Hadoop 集群小文件归档 HAR、小文件优化 Uber 模式
该文介绍了Hadoop中两种小文件管理策略。首先,通过Hadoop Archive (HAR)将小文件归档成大文件以减少存储和管理开销。操作包括使用`hadoop archive`命令进行归档和解档。其次,文章讨论了小文件优化的Uber模式,这种模式在同一JVM中运行所有MapReduce任务以提高效率和局部性,但可能引发单点故障和资源限制问题。启用Uber模式需在`mapred-site.xml`配置文件中设置相关参数。文中还提供了使用WordCount例子验证Uber模式配置的步骤。
|
6月前
|
存储 分布式计算 Hadoop
Hadoop分块存储解析及还原分块存储的文件
Hadoop分块存储解析及还原分块存储的文件
29 0
|
6月前
|
分布式计算 Hadoop Linux
解决Hadoop在浏览器中Browse Directory,无法下载文件的问题
解决Hadoop在浏览器中Browse Directory,无法下载文件的问题
48 0
|
7月前
|
分布式计算 Hadoop Java
Hadoop学习笔记:运行wordcount对文件字符串进行统计案例
Hadoop学习笔记:运行wordcount对文件字符串进行统计案例
41 0
|
9月前
|
SQL 存储 分布式计算
大数据Hadoop小文件问题与企业级解决方案
大数据Hadoop小文件问题与企业级解决方案
48 0
|
10月前
|
机器学习/深度学习 分布式计算 资源调度
Hadoop3 Centos 7编译安装和文件配置(内附编译好的包)
Hadoop3 Centos 7编译安装和文件配置(内附编译好的包)
176 1
|
存储 分布式计算 Java
|
分布式计算 Java Hadoop
hadoop实现文件的上传下载
hadoop实现文件的上传下载
|
分布式计算 Hadoop Shell
开启hadoop后对文件执行shell操作
开启hadoop后对文件执行shell操作

相关实验场景

更多