Hadoop MapReduce编程 API入门系列之join(二十六)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介:

 

 

 

天气记录数据库

Station ID Timestamp Temperature

 

 

 

气象站数据库

Station ID Station Name

 

 

 

 

气象站和天气记录合并之后的示意图如下所示。

Station ID Station Name Timestamp Temperature
011990-99999 SIHCCAJAVRI 195005150700 0
011990-99999 SIHCCAJAVRI 195005151200 22
011990-99999 SIHCCAJAVRI 195005151800 -11
012650-99999 TYNSET-HANSMOEN 194903241200 111
012650-99999 TYNSET-HANSMOEN 194903241800 78

 

 

     连接操作的具体实现技术取决于数据集的规模及分区方式。如果一个数据集很大而另外一个集合很小,以至于可以分发到集群中的每一个节点之中, 则可以执行一个 MapReduce 作业,将各个气象站的天气记录放到一块,从而实现连接。mapper 或 reducer 根据各气象站 ID 从较小的数据集合中找到气象站元数据,从而完成气象站数据和天气记录数据的合并。 

     连接操作如果由 mapper 执行,则称为 “map 端连接” ;如果由 reducer 执行,则称为 “reduce 端连接”。

     如果两个数据集的规模均很大,以至于没有哪个数据集可以被完全复制到集群的每个节点,我们仍然可以使用 MapReduce 来进行连接,至于到底采用 map 端连接还是 reduce 端连接,则取决于数据的组织方式。

 

 

map 端连接

        在两个大规模输入数据集之间的 map 端连接会在数据达到 map 函数之前就执行连接操作。为达到该目的,各 map 的输入数据必须先分区并且以特定方式排序。 各个输入数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。听起来似乎要求非常严格,但这的确合乎 MapReduce 作业的输出。

        map 端连接操作可以连接多个作业的输出,只要这些作业的 reducer 数量相同、键相同并且输出文件是不可切分的(例如,小于一个 HDFS 块,或 gzip 压缩)。 在上面讲的天气例子中,如果气象站文件以气象站ID部分排序,记录文件也以气象站 ID 部分排序,而且 reducer 的数量相同,则就满足了执行 map 端连接的前提条件。

        利用 org.apache.hadoop.mapreduce.join 包中的 CompositeInputFormat 类来运行一个 map 端连接。CompositeInputFormat 类的输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置, 连接表达式的语法简单。详情与示例可参考包文档。此种方法不常用,了解即可,这里不再赘述。

 

 

 

 

 

reduce 端连接

        由于 reduce 端连接并不要求输入数据集符合特定结构,因而 reduce端连接比 map 端连接更为常用。但是,由于两个数据集均需经过 MapReduce 的 shuffle 过程, 所以 reduce 端连接的效率往往要低一些。基本思路是 mapper 为各个记录标记源,并且使用连接键作为 map 输出键,使键相同的记录放在同一 reducer 中。 我们通过下面两种技术实现 reduce 端连接。

        1、多输入

        数据集的输入源往往有多种格式,因此可以使用 MultipleInputs 类来方便地解析和标注各个数据源。MultipleInputs 的用法,在输入格式课程已经详细介绍,这里就不再赘述。

        2、二次排序

        如前所述,reducer 将两个数据源中选出键相同的记录并不介意这些记录是否已排好序。此外,为了更好地执行连接操作,先将某一个数据源传输到 reducer 会非常重要。 还以上面的天气数据连接为例,当天气记录发送到 reducer 的时候,与这些记录有相同键的气象站信息最好也已经放在 reducer ,使得 reducer 能够将气象站名称填到天气记录之中就马上输出。 虽然也可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况,因为其中任何一组的记录数量可能非常庞大,远远超出 reducer 的可用内存容量。 因此我们用到二次排序技术,对 map 阶段输出的每个键的值进行排序,实现这一效果。

        我们使用 TextPair 类构建组合键,包括气象站 ID 和 “标记”。在这里,“标记” 是一个虚拟的字段,其唯一目的是对记录排序,使气象站记录比天气记录先到达。一种简单的做法就是:对于气象站记录, “标记” 的值设为 0;对于天气记录,“标记” 的值设为1。

 

 

 

JoinStationMapper 处理来自气象站数据,代码如下所示。

public class JoinStationMapper extends Mapper< LongWritable,Text,TextPair,Text>{ protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] arr = line.split("\\s+");//解析气象站数据 int length = arr.length; if(length==2){//满足这种数据格式 //key=气象站id value=气象站名称 context.write(new TextPair(arr[0],"0"),new Text(arr[1])); } } }

 

 

 

 

    JoinRecordMapper 处理来自天气记录数据,代码如下所示。

public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>{ protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] arr = line.split("\\s+",2);//解析天气记录数据 int length = arr.length; if(length==2){ //key=气象站id value=天气记录数据 context.write(new TextPair(arr[0],"1"),new Text(arr[1])); } } }




 

    由于 TextPair 经过了二次排序,所以 reducer 会先接收到气象站数据。因此从中抽取气象站名称,并将其作为后续每条输出记录的一部分写到输出文件。JoinReducer 的代码如下所示。

public class JoinReducer extends Reducer< TextPair,Text,Text,Text>{ protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException{ Iterator< Text> iter = values.iterator(); Text stationName = new Text(iter.next());//气象站名称 while(iter.hasNext()){ Text record = iter.next();//天气记录的每条数据 Text outValue = new Text(stationName.toString()+"\t"+record.toString()); context.write(key.getFirst(),outValue); } } }


     上面 JoinReducer 里面的代码,假设天气记录的每个气象站 ID 恰巧与气象站数据集中的一条记录准确匹配。 如果该假设不成立,则需要泛化代码,使用另一个 TextPair 将标记放入值的对象中。reduce() 方法在处理天气记录之前,要能够区分哪些记录是气象站名称, 处理缺失或重复的记录。

 

 

 

 

 

     下面我们定义作业的驱动类 JoinRecordWithStationName,在该类中,关键在于根据组合键的第一个字段(即气象站 ID)进行分区和分组,即使用一个自定义的 Partitioner 和 一个自定义的分组 comparator 作为TextPair 的嵌套类。JoinRecordWithStationName 类的代码如下所示。

public class JoinRecordWithStationName extends Configured implements Tool{
	public static class KeyPartitioner extends Partitioner< TextPair,Text>{ @overwrite public int getPartition(TextPair key,Text value,int numPartitions){ return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions; } } @overwrite public int run(String[] args) throws Exception{ Configuration conf = new Configuration();// 读取配置文件 Job job = Job.getInstance();// 新建一个任务 job.setJarByClass(JoinRecordWithStationName.class);// 主类 Path recordInputPath = new Path(args[0]);//天气记录数据源 Path stationInputPath = new Path(args[1]);//气象站数据源 Path outputPath = new Path(args[2]);//输出路径 MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper FileOutputFormat.setOutputPath(job,outputPath); job.setPartitionerClass(KeyPartitioner.class);//自定义分区 job.setGroupingComparatorClass(TextPair.FirstComparator.class);//自定义分组 job.setMapOutputKeyClass(TextPair.class); job.setReducerClass(JoinReducer.class);// Reducer job.setOutputKeyClass(Text.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new JoinRecordWithStationName(),args); System.exit(exitCode); } }

 

 

  该样本数据上运行程序,获得以下输出结果。

011990-99999	SIHCCAJAVRI	195005150700	0
011990-99999	SIHCCAJAVRI	195005151200	22
011990-99999	SIHCCAJAVRI	195005151800	-11
012650-99999	TYNSET-HANSMOEN	194903241200	111
012650-99999	TYNSET-HANSMOEN	194903241800	78

 

 

 

 

 

分布式缓存

        当 MapReduce 处理大型数据集间的 join 操作时,此时如果一个数据集很大而另外一个集合很小,以至于可以分发到集群中的每个节点之中。 这种情况下,我们就用到了 Hadoop 的分布式缓存机制,它能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用。为了节约网络宽带,在每一个作业中, 各个文件通常只需要复制到一个节点一次。

 

 1、用法

        Hadoop 命令行选项中,有三个命令可以实现文件复制分发到任务的各个节点。

        1)用户可以使用 -files 选项指定待分发的文件,文件内包含以逗号隔开的 URL 列表。文件可以存放在本地文件系统、HDFS、或其它 Hadoop 可读文件系统之中。 如果尚未指定文件系统,则这些文件被默认是本地的。即使默认文件系统并非本地文件系统,这也是成立的。

        2)用户可以使用 -archives 选项向自己的任务中复制存档文件,比如JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,这些文件会被解档到任务节点。

        3)用户可以使用 -libjars 选项把 JAR 文件添加到 mapper 和 reducer 任务的类路径中。如果作业 JAR 文件并非包含很多库 JAR 文件,这点会很有用。

 2、工作机制

        当用户启动一个作业,Hadoop 会把由 -files、-archives、和 -libjars 等选项所指定的文件复制到分布式文件系统之中。接着,在任务运行之前, tasktracker 将文件从分布式文件系统复制到本地磁盘(缓存)使任务能够访问文件。此时,这些文件就被视为 “本地化” 了。从任务的角度来看, 这些文件就已经在那儿了,它并不关心这些文件是否来自 HDFS 。此外,有 -libjars 指定的文件会在任务启动前添加到任务的类路径(classpath)中。

 3、分布式缓存 API

        由于可以通过 Hadoop 命令行间接使用分布式缓存,大多数应用不需要使用分布式缓存 API。然而,一些应用程序需要用到分布式缓存的更高级的特性,这就需要直接使用 API 了。 API 包括两部分:将数据放到缓存中的方法,以及从缓存中读取数据的方法。

 

   1)首先掌握数据放到缓存中的方法,以下列举 Job 中可将数据放入到缓存中的相关方法:

public void addCacheFile(URI uri);
public void addCacheArchive(URI uri);//以上两组方法将文件或存档添加到分布式缓存
public void setCacheFiles(URI[] files);
public void setCacheArchives(URI[] archives);//以上两组方法将一次性向分布式缓存中添加一组文件或存档
public void addFileToClassPath(Path file);
public void addArchiveToClassPath(Path archive);//以上两组方法将文件或存档添加到 MapReduce 任务的类路径
public void createSymlink();

     在缓存中可以存放两类对象:文件(files)和存档(achives)。文件被直接放置在任务节点上,而存档则会被解档之后再将具体文件放置在任务节点上。


 

     2)其次掌握在 map 或者 reduce 任务中,使用 API 从缓存中读取数据。

public Path[] getLocalCacheFiles() throws IOException;
public Path[] getLocalCacheArchives() throws IOException;
public Path[] getFileClassPaths();
public Path[] getArchiveClassPaths();

 

    我们可以使用 getLocalCacheFiles()和getLocalCacheArchives()方法获取缓存中的文件或者存档的引用。 当处理存档时,将会返回一个包含解档文件的目的目录。相应的,用户可以通过 getFileClassPaths()和getArchivesClassPaths()方法获取被添加到任务的类路径下的文件和文档。

        下面我们仍然以前面的气象站数据和天气记录数据为例,使用分布式缓存 API,完成两个数据集的连接操作。完整的 MapReduce 程序如下所示。

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinRecordWithStationName extends Configured implements Tool {

	public static class TemperatureMapper extends
			Mapper< LongWritable, Text, Text, Text> {
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] arr = value.toString().split("\t", 2);
			if (arr.length == 2) {
				context.write(new Text(arr[0]), value);
			}

		}
	}

	public static class TemperatureReducer extends
			Reducer< Text, Text, Text, Text> { private Hashtable< String, String> table = new Hashtable< String, String>();//定义Hashtable存放缓存数据 /** * 获取分布式缓存文件 */ protected void setup(Context context) throws IOException, InterruptedException { Path[] localPaths = (Path[]) context.getLocalCacheFiles();//返回本地文件路径 if (localPaths.length == 0) { throw new FileNotFoundException( "Distributed cache file not found."); } FileSystem fs = FileSystem.getLocal(context.getConfiguration());//获取本地 FileSystem 实例 FSDataInputStream in = null; in = fs.open(new Path(localPaths[0].toString()));// 打开输入流 BufferedReader br = new BufferedReader(new InputStreamReader(in));// 创建BufferedReader读取器 String infoAddr = null; while (null != (infoAddr = br.readLine())) {// 按行读取并解析气象站数据 String[] records = infoAddr.split("\t"); table.put(records[0], records[1]);//key为stationID,value为stationName } } public void reduce(Text key, Iterable< Text> values, Context context) throws IOException, InterruptedException { String stationName = table.get(key.toString());//天气记录根据stationId 获取stationName for (Text value : values) { context.write(new Text(stationName), value); } } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(new URI( "hdfs://single.hadoop.dajiangtai.com:9000"), conf); Path out = new Path(args[1]); if (hdfs.isDirectory(out)) { hdfs.delete(out, true); } Job job = Job.getInstance();//获取一个job实例 job.setJarByClass(JoinRecordWithStationName.class); FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(args[0])); FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path( args[1])); //添加分布式缓存文件 station.txt job.addCacheFile(new URI("hdfs://HadoopMaster:9000/middle/temperature/station.txt")); job.setMapperClass(TemperatureMapper.class); job.setReducerClass(TemperatureReducer.class); job.setOutputKeyClass(Text.class);// 输出key类型 job.setOutputValueClass(Text.class);// 输出value类型 return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { String[] arg = { "hdfs://HadoopMaster:9000/middle/temperature/records.txt", "hdfs://HadoopMaster:9000/middle/temperature/out/" }; int ec = ToolRunner.run(new Configuration(), new JoinRecordWithStationName(), arg); System.exit(ec); } }

 

     添加分布式缓存文件相对简单,只需使用job.addCacheFile(new URI(cacheFilePath))方法添加缓存文件即可。需要注意的是,在获取获取缓存文件时,文件将以 “本地的” Path 对象的形式返回。为了读取文件,用户需要首先使用 getLocal()方法获得一个 Hadoop 本地 FileSystem 实例。本程序中,我们在 Reduce 的 setup() 方法中获取缓存文件。

        以下是示例数据集的输出结果,达到我们预期的效果。

SIHCCAJAVRI	011990-99999	195005151800	-11
SIHCCAJAVRI	011990-99999	195005151200	22
SIHCCAJAVRI	011990-99999	195005150700	0
TRNSET-HANSMOEN	012650-99999	194903241800	78
TRNSET-HANSMOEN	012650-99999	194903241200	111

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 得到结果

SIHCCAJAVRI	011990-99999	195005151800	-11
SIHCCAJAVRI	011990-99999	195005151200	22
SIHCCAJAVRI	011990-99999	195005150700	0
TRNSET-HANSMOEN	012650-99999	194903241800	78
TRNSET-HANSMOEN	012650-99999	194903241200	111

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 代码版本1

package zhouls.bigdata.myMapReduce.Join;

import java.util.Set;


import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair>
{
private Text first; //Text 类型的实例变量first
private Text second;//Text 类型的实例变量second

public TextPair() //无参构造方法
{
set(new Text(),new Text());
}
public TextPair(String first,String second) // Sting类型参数的构造方法
{
set(new Text(first),new Text(second));
}
public TextPair(Text first,Text second) // Text类型参数的构造方法
{
set(first,second);
}
public void set(Text first,Text second) //set方法
{
this.first=first;
this.second=second;
}
public Text getFirst() //getFirst方法
{
return first;
}
public Text getSecond() //getSecond方法
{
return second;
}

//将对象转换为字节流并写入到输出流out中
public void write(DataOutput out) throws IOException //write方法
{
first.write(out);
second.write(out);
}

//从输入流in中读取字节流反序列化为对象
public void readFields(DataInput in) throws IOException //readFields方法
{
first.readFields(in);
second.readFields(in);
}

@Override
public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
{
return first.hashCode() *163+second.hashCode();
}

@Override
public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
{
if (o instanceof TextPair)
{
TextPair tp=(TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}

@Override
public String toString() //toString方法
{
return first +"\t"+ second;
}
public int compareTo(TextPair o)
{
// TODO Auto-generated method stub
if(!first.equals(o.first))
{
return first.compareTo(o.first);
}
else if(!second.equals(o.second))
{
return second.compareTo(o.second);
}
return 0;
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import zhouls.bigdata.myMapReduce.Join.TextPair;

public class JoinStationMapper extends Mapper<LongWritable,Text,TextPair,Text>

protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据
int length = arr.length;
if(length==2)
{//满足这种数据格式
//key=气象站id value=气象站名称
System.out.println("station="+arr[0]+"0");
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
}
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer< TextPair,Text,Text,Text>
{
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
{
Iterator< Text> iter = values.iterator();
Text stationName = new Text(iter.next());//气象站名称
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

 

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>

protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
int length = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));

}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.Join;

import java.io.BufferedReader;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinRecordWithStationName extends Configured implements Tool 
{
public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
{
String[] arr = value.toString().split("\t", 2);
if (arr.length == 2) 
{
context.write(new Text(arr[0]), value);
}

}
}

public static class TemperatureReducer extends Reducer< Text, Text, Text, Text> 
{
private Hashtable< String, String> table = new Hashtable< String, String>();//定义Hashtable存放缓存数据

/**
* 获取分布式缓存文件
*/
protected void setup(Context context) throws IOException,
InterruptedException
{
Path[] localPaths = (Path[]) context.getLocalCacheFiles();//返回本地文件路径 
if (localPaths.length == 0) 
{
throw new FileNotFoundException("Distributed cache file not found.");
}
FileSystem fs = FileSystem.getLocal(context.getConfiguration());//获取本地 FileSystem 实例
FSDataInputStream in = null;

in = fs.open(new Path(localPaths[0].toString()));// 打开输入流
BufferedReader br = new BufferedReader(new InputStreamReader(in));// 创建BufferedReader读取器
String infoAddr = null;
while (null != (infoAddr = br.readLine()))
{// 按行读取并解析气象站数据
String[] records = infoAddr.split("\t");
table.put(records[0], records[1]);//key为stationID,value为stationName
}
}

public void reduce(Text key, Iterable< Text> values, Context context)
throws IOException, InterruptedException 
{
String stationName = table.get(key.toString());//天气记录根据stationId 获取stationName
for (Text value : values)
{
context.write(new Text(stationName), value);
}
}
}

public int run(String[] args) throws Exception
{
// TODO Auto-generated method stub
Configuration conf = new Configuration();

// FileSystem hdfs = FileSystem.get(new URI("hdfs://HadoopMaster:9000"), conf);
// Path out = new Path(args[1]);
// if (hdfs.isDirectory(out)) 
// {
// hdfs.delete(out, true);
// }

Job job = Job.getInstance();//获取一个job实例
job.setJarByClass(JoinRecordWithStationName.class);

// FileInputFormat.addInputPath(job,
// new org.apache.hadoop.fs.Path(args[0]));
// FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(args[1]));


FileInputFormat.addInputPath(job,
new org.apache.hadoop.fs.Path("./data/join/station.txt"));
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("./out/join/"));

//添加分布式缓存文件 station.txt
// job.addCacheFile(new URI("hdfs://HadoopMaster:9000/join/station.txt"));
job.addCacheFile(new URI("./data/join/station.txt"));


job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(TemperatureReducer.class);

job.setOutputKeyClass(Text.class);// 输出key类型
job.setOutputValueClass(Text.class);// 输出value类型

return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception 
{
// String[] arg = {
// "hdfs://HadoopMaster:9000/join/records.txt",
// "hdfs://HadoopMaster:9000/join/out/" };
//
String[] arg = {
"./data/join/records.txt",
"./out/join/" };


int ec = ToolRunner.run(new Configuration(),new JoinRecordWithStationName(), arg);
System.exit(ec);
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.Join;

public class JoinRecordAndStationName
{

/**
* @param args
*/
public static void main(String[] args) 
{
// TODO Auto-generated method stub

}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码版本2 

package zhouls.bigdata.myMapReduce.Join;

import java.util.Set;


import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair>
{
private Text first; //Text 类型的实例变量first
private Text second;//Text 类型的实例变量second

public TextPair() //无参构造方法
{
set(new Text(),new Text());
}
public TextPair(String first,String second) // Sting类型参数的构造方法
{
set(new Text(first),new Text(second));
}
public TextPair(Text first,Text second) // Text类型参数的构造方法
{
set(first,second);
}
public void set(Text first,Text second) //set方法
{
this.first=first;
this.second=second;
}
public Text getFirst() //getFirst方法
{
return first;
}
public Text getSecond() //getSecond方法
{
return second;
}

//将对象转换为字节流并写入到输出流out中
public void write(DataOutput out) throws IOException //write方法
{
first.write(out);
second.write(out);
}

//从输入流in中读取字节流反序列化为对象
public void readFields(DataInput in) throws IOException //readFields方法
{
first.readFields(in);
second.readFields(in);
}

@Override
public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
{
return first.hashCode() *163+second.hashCode();
}

@Override
public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
{
if (o instanceof TextPair)
{
TextPair tp=(TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}

@Override
public String toString() //toString方法
{
return first +"\t"+ second;
}
public int compareTo(TextPair o)
{
// TODO Auto-generated method stub
if(!first.equals(o.first))
{
return first.compareTo(o.first);
}
else if(!second.equals(o.second))
{
return second.compareTo(o.second);
}
return 0;
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import zhouls.bigdata.myMapReduce.Join.TextPair;

public class JoinStationMapper extends Mapper<LongWritable,Text,TextPair,Text>

protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据
int length = arr.length;
if(length==2)
{//满足这种数据格式
//key=气象站id value=气象站名称
System.out.println("station="+arr[0]+"0");
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
}
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer< TextPair,Text,Text,Text>
{
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
{
Iterator< Text> iter = values.iterator();
Text stationName = new Text(iter.next());//气象站名称
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

 

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>

protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
int length = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));

}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

//版本2
package zhouls.bigdata.myMapReduce.Join;

import java.io.InputStream;
import org.apache.hadoop.util.Tool;
import java.io.OutputStream;
import java.util.Set;

import javax.lang.model.SourceVersion;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;

public class JoinRecordWithStationName extends Configured implements Tool
{
public static class KeyPartitioner extends Partitioner< TextPair,Text>
{

public int getPartition(TextPair key,Text value,int numPartitions)
{
return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
}
}

public static class GroupingComparator extends WritableComparator
{
protected GroupingComparator()
{
super(TextPair.class,true);
}
@Override
public int compare(WritableComparable w1,WritableComparable w2)
{
TextPair ip1=(TextPair) w1;
TextPair ip2=(TextPair) w2;
Text l=ip1.getFirst();
Text r=ip2.getFirst();
return l.compareTo(r);


}
public int run(String[] args) throws Exception
{
Configuration conf = new Configuration();// 读取配置文件

Path mypath=new Path(args[2]);
FileSystem hdfs=mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath))
{
hdfs.delete(mypath,true);
}

Job job = Job.getInstance(conf,"join");// 新建一个任务
job.setJarByClass(JoinRecordWithStationName.class);// 主类

Path recordInputPath = new Path(args[0]);//天气记录数据源,这里是牵扯到多路径输入和多路径输出的问题。默认是从args[0]开始
Path stationInputPath = new Path(args[1]);//气象站数据源
Path outputPath = new Path(args[2]);//输出路径

//若只有一个输入和一个输出,则输入是args[0],输出是args[1]。
//若有两个输入和一个输出,则输入是args[0]和args[1],输出是args[2]

MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper
MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper
FileOutputFormat.setOutputPath(job,outputPath);
job.setReducerClass(JoinReducer.class);// Reducer
job.setNumReduceTasks(2);

job.setPartitionerClass(KeyPartitioner.class);//自定义分区
job.setGroupingComparatorClass(GroupingComparator.class);//自定义分组

job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception
{
String[] args0={"hdfs://HadoopMaster:9000/join/records.txt"
,"hdfs://HadoopMaster:9000/join/station.txt"
,"hdfs://HadoopMaster:9000/join/out"
};
int exitCode=ToolRunner.run( new JoinRecordWithStationName(), args0);
System.exit(exitCode);
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.Join;

public class JoinRecordAndStationName
{

/**
* @param args
*/
public static void main(String[] args) 
{
// TODO Auto-generated method stub

}

}

 

本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6166343.html,如需转载请自行联系原作者

相关文章
|
28天前
|
开发框架 .NET API
RESTful API 设计与实现:C# 开发者的一分钟入门
【10月更文挑战第5天】本文从零开始,介绍了如何使用 C# 和 ASP.NET Core 设计并实现一个简单的 RESTful API。首先解释了 RESTful API 的概念及其核心原则,然后详细说明了设计 RESTful API 的关键步骤,包括资源识别、URI 设计、HTTP 方法选择、状态码使用和错误处理。最后,通过一个用户管理 API 的示例,演示了如何创建项目、定义模型、实现控制器及运行测试,帮助读者掌握 RESTful API 的开发技巧。
50 7
|
3月前
|
安全 Java API
告别繁琐编码,拥抱Java 8新特性:Stream API与Optional类助你高效编程,成就卓越开发者!
【8月更文挑战第29天】Java 8为开发者引入了多项新特性,其中Stream API和Optional类尤其值得关注。Stream API对集合操作进行了高级抽象,支持声明式的数据处理,避免了显式循环代码的编写;而Optional类则作为非空值的容器,有效减少了空指针异常的风险。通过几个实战示例,我们展示了如何利用Stream API进行过滤与转换操作,以及如何借助Optional类安全地处理可能为null的数据,从而使代码更加简洁和健壮。
106 0
|
1月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
31 3
|
1月前
|
机器学习/深度学习 算法 API
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
|
1月前
|
IDE API 定位技术
Python--API编程:IP地址翻译成实际的物理地址
Python--API编程:IP地址翻译成实际的物理地址
|
2月前
|
网络协议 API Windows
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
|
3月前
|
开发者
告别繁琐代码,JSF标签库带你走进高效开发的新时代!
【8月更文挑战第31天】JSF(JavaServer Faces)标准标签库为页面开发提供了大量组件标签,如`&lt;h:inputText&gt;`、`&lt;h:dataTable&gt;`等,简化代码、提升效率并确保稳定性。本文通过示例展示如何使用这些标签实现常见功能,如创建登录表单和展示数据列表,帮助开发者更高效地进行Web应用开发。
42 0
|
3月前
|
前端开发 API 开发者
【React状态管理新思路】Context API入门:从零开始摆脱props钻孔的优雅之道,全面解析与实战案例分享!
【8月更文挑战第31天】React 的 Context API 有效解决了多级组件间状态传递的 &quot;props 钻孔&quot; 问题,使代码更简洁、易维护。本文通过电子商务网站登录状态管理案例,详细介绍了 Context API 的使用方法,包括创建、提供及消费 Context,以及处理多个 Context 的场景,适合各水平开发者学习与应用,提高开发效率和代码质量。
37 0
|
3月前
|
JSON API 数据库
神秘编程力量来袭!Rails 究竟隐藏着怎样的魔力,能构建出强大的 RESTful API?快来一探究竟!
【8月更文挑战第31天】《构建 RESTful API:使用 Rails 进行服务端开发》介绍了如何利用 Ruby on Rails 框架高效构建可扩展的 RESTful API。Rails 采用“约定优于配置”,简化开发流程,通过示例展示了路由定义、控制器设计及模型层交互等内容,帮助开发者快速搭建稳定可靠的服务端。无论小型项目还是大型应用,Rails 均能提供强大支持,提升开发效率。
28 0
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)