Hadoop的输入输出格式(重要)

本文涉及的产品
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: <p><span style="font-size:14px">首先说一下Hadoop中预定义的Mapper 与Reducer</span></p> <p><span style="font-size:14px"><img src="" alt=""><br></span></p> <p><span style="font-size:14px"><img src="http://img

首先说一下Hadoop中预定义的Mapper 与Reducer




InputFormat接口决定了输入文件如何被hadoop分块(split up)接受。

TextInputFormat是InputFormat的默认实现,对于输入数据中没有明确的key值时非常有效,TextInputFormat返回的key值为字符在输入块中的行数,value为这行的内容。

其他InputFormat的子类还有

KeyValueTextInputFormat(键:Text,值:Text)他的分割符默认为tab("\t"),我们可以通过key.value.separator.input.line.property设置

SequenceFileInputFormat<K,V>(键和值都是由用户定义)

NLineInputFormat(键:LongWritable,值:Text)等

你的MapReduce程序如果要是用KeyValueTextInputFormat作为输入格式,我们可以这样做:

conf.setInputFormat(KeyValueTextInputFormat.class);

package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
public interface InputFormat<K, V> {

  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job, 
                                     Reporter reporter) throws IOException;
}

以上是InputFormat的定义,里边有两个方法。

第一个方法的功能是确认所有的文件为输入数据并且把他们分成块(splits)。每一个任务都被分配一个块(split)。

第二个方式的功能是提供一个对象用来遍历给定块(split)中的记录,并且把记录解析成先前定义的key和value类型。


一般来说getSplits()方法不用我们去管。其实以上所举出的InputFormat的子类都是继承子FileInputFormat的,FileInputFormat实现了getSplits方法,把getRecordReader()方法留给子类去实现。

我们在使用FileInputFormat时,我们主要的精力在定制合适的RecordReader类上,因为他负责如何将splits解析为records,将records解析成为key/value对。

package org.apache.hadoop.mapred;

import java.io.IOException;
import java.io.DataInput;
public interface RecordReader<K, V> {
  boolean next(K key, V value) throws IOException;
  K createKey();
  V createValue();
  long getPos() throws IOException;
  public void close() throws IOException;
  float getProgress() throws IOException;
}


以上为RecordReader的签名,我们一般从现有的RecordReader的子类中来定制自己的RecordReader。主要是实现next()方法。

比如LineRecordReader继承了RecordReader<LongWritable, Text>,TextInputFormat使用了这个Reader。KeyValueLineRecordReader用在了KeyValueTextInputFormat类中。

下面我们自己实现一个RecordReader类,她将要解析的key为Text,value为URLWritable(自己定义)

首先我们需要定义URLWritable这个类,有了这个以后我们就可以定义我们的Reader了

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueLineRecordReader;
import org.apache.hadoop.mapred.RecordReader;


public class TimeUrlLineRecordReader implements RecordReader<Text, URLWritable> {

	private KeyValueLineRecordReader lineReader;
	private Text lineKey, lineValue;
	public TimeUrlLineRecordReader(JobConf job, FileSplit split) throws IOException {
		lineReader = new KeyValueLineRecordReader(job, split);
		
		lineKey = lineReader.createKey();
		lineValue = lineReader.createValue();
	}
	@Override
	public boolean next(Text key, URLWritable value) throws IOException {
		// TODO Auto-generated method stub
		if (!lineReader.next(lineKey, lineValue)) {
			return false;	
		}
		key.set(lineKey);
		value.set(lineValue.toString());
		return true;
		
	}

	@Override
	public Text createKey() {
		// TODO Auto-generated method stub
		return new Text("");
	}

	@Override
	public URLWritable createValue() {
		// TODO Auto-generated method stub
		return new URLWritable();
	}

	@Override
	public long getPos() throws IOException {
		// TODO Auto-generated method stub
		return lineReader.getPos();
	}

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		lineReader.close();
	}

	@Override
	public float getProgress() throws IOException {
		// TODO Auto-generated method stub
		return lineReader.getProgress();
	}

}
class URLWritable implements Writable {

	protected URL url;
	public URLWritable() {}
	public URLWritable(URL url) {
		this.url = url;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeUTF(url.toString());
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		url = new URL(in.readUTF());
	}
	public void set(String s) throws MalformedURLException {
		url = new URL(s);
	}
	
}
可以看到,我们在TimeUrlLineRecordReader类中创建了一个KeyValueLineRecordReader对象,然后在实现getPos()、getProgress()、close()方法是直接是哦那个他对应的方法即可。

在next()方法中,我Text类型的lineValue转型为URLWritable类型了。

这时我们就可呀使用这个RecordReader了

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;


public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable> {

	@Override
	public RecordReader<Text, URLWritable> getRecordReader(InputSplit split,
			JobConf job, Reporter reporter) throws IOException {
		// TODO Auto-generated method stub
		return new TimeUrlLineRecordReader(job, (FileSplit)split);
	}

}


OutputFormat和InputFormat类似,但是有些地方不一样。

和InputFormat一样,所有的OutputFormat大多数继承自FileOutputFormat,但是NullOutputForma和DBOutputFormat。他们是为专门领域的应用程序预留的。


TextOutputFormat是默认的输入格式,key value 用一个tab分开,分割符也可以通过mapred.textoutputformat.separator属性设置

TextOutputFormat的输出格式可以被KeyValueTextInputFormat接受

如果输出的key类型为NullWritable的输出格式可以被TextInputFormat接受。在这中情况下key没有被输出来,也没有分割符。

如果不想让reduce程序有任何输出我们可以把输出格式设置为NullOutputFormat。阻止hadoop的输出在reducer用自己的方式输出文件而不许要hadoop框架些任何额外文件是十分有用。

SequenceFileOutuputFormat它把输出写入到一个串文件中(sequence files),这样我么就可以用SequenceFileInputFormat读会数据。这在处理多个联系mapreduce任务时十分有效。

PS:hadoop中预定义的InputFormat与OutputFormat




目录
相关文章
|
8月前
|
存储 分布式计算 Hadoop
[hadoop3.x系列]Hadoop常用文件存储格式及BigData File Viewer工具的使用(三)
[hadoop3.x系列]Hadoop常用文件存储格式及BigData File Viewer工具的使用(三)
132 2
|
8月前
|
存储 分布式计算 Hadoop
hadoop中压缩及存储常见格式图解
hadoop中压缩及存储常见格式图解
85 0
|
存储 分布式计算 Linux
基于Hadoop生态系统的一种高性能数据存储格式CarbonData(性能篇)
CarbonData在数据查询的性能表现比Parquet好很多,在写一次读多次的场景下非常适合使用;社区比较活跃,响应也很及时。目前官网发布版本1.3.0与最新的spark稳定版Spark2.2.1集成,增加了支持标准的Hive分区,支持流数据准实时入库等新特性,相信会有越来越多的项目会使用到。
3981 0
|
分布式计算 Hadoop
Hadoop使用lzo压缩格式
在hadoop中搭建lzo环境: wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.06.tar.gz   export CFLAGS=-m64   .
891 0
|
分布式计算 Hadoop 数据库
Hadoop学习笔记(一):MapReduce的输入格式
    Hadoop学习有一段时间了,但是缺乏练手的项目,老是学了又忘。想想该整理一个学习笔记啥的,这年头打字比写字方便。果断开博客,咩哈哈~~     开场白结束(木有文艺细胞)     默认的MapReduce作业 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configu
1941 0
|
3月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
201 6
|
3月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
88 2
|
12天前
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
46 4