开发者社区> @dailidong@> 正文

Reduce Side Join实现

简介: 关于reduce边join,其最重要的是使用MultipleInputs.addInputPath这个api对不同的表使用不同的Map,然后在每个Map里做一下该表的标识,最后到了Reduce端再根据标识区分对应的表! Reduce Side Join Exa...
+关注继续查看

关于reduce边join,其最重要的是使用MultipleInputs.addInputPath这个api对不同的表使用不同的Map,然后在每个Map里做一下该表的标识,最后到了Reduce端再根据标识区分对应的表!


Reduce Side Join Example

User and comment join

In thisexample, we’ll be using theusers and comments tables from the StackOverflow dataset. Storing data in this matter makessense, as storingrepetitive user data witheach comment is unnecessary. Thiswould also makeupdating user information diffi‐ cult. However,having disjoint data sets posesproblems when it comes to associating a comment with the user who wroteit. Through the use of a reduceside join, thesetwo data sets canbe merged together using the userID as the foreign key. In this example, we’ll perform an inner, outer, and antijoin. The choice of which join to execute is set during job configuration.

Hadoop supportsthe ability to use multipleinput data typesat once, allowingyou to create a mapper classand input formatfor each inputsplit from different data sources. This is extremely helpful, because you don’t have to code logic for two different data inputs in the samemap implementation. In the following example, two mapperclasses are created: one for the user data and one for the comments. Each mapper classoutputs the user ID as the foreignkey, and the entire record as the value along with a single character to flag whichrecord came fromwhat set. Thereducer then copiesall values for eachgroup in memory, keepingtrack of whichrecord came fromwhat data set.The records are then joined togetherand output.

 



The following descriptions of eachcode section explainthe solution to the problem.

Problem: Given a set of user information and a list of user’s comments, enrich each comment with the information about the userwho created thecomment.

 

Drivercode.The job configurationis slightly different from the standard configuration due to the user of themultiple input utility. We also set the join type in the jobconfig‐ uration to args[2] so it can be used in the reducer. The relevant piece of the drivercode to use the MultipleInput follows:

...

// Use MultipleInputs to set which inputuses what mapper

// This will keep parsingof each dataset separate froma logical standpoint

// The firsttwo elements of theargs arrayare the two inputs 

MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,UserJoinMapper.class);

MultipleInputs.addInputPath(job,newPath(args[1]), TextInputFormat.class, CommentJoinMapper.class);

 

job.getConfiguration()..set("join.type", args[2]);

...

 

User mappercode.This mapper parseseach input lineof user dataXML. It grabs theuser ID associated with each record and outputs it along with the entire input  value. It prepends the letter A in front of theentire value. This allows the reducer to know which values came from what data  set.

public static class UserJoinMapper extendsMapper<Object, Text, Text, Text> {

 

private Text outkey =newText();

private Text outvalue =newText();

 

public void map(Object key, Text value, Context context) throwsIOException, InterruptedException {

 

// Parse the input stringinto a nice map

Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());

 

String userId = parsed.get("Id");

 

// The foreign join keyis the userID

outkey.set(userId);

 

// Flag this record for the reducerand then outputoutvalue.set("A" + value.toString()); context.write(outkey, outvalue);

}

}



When you output the value from the map side, the entire record doesn’t have to be sent. This is an opportunity to optimize the join by keepingonly the fields of data you want to join together. It requiresmore pro‐ cessing on the map side, but is worthit in the long run. Also, sincethe foreign key is in the map output key, you don’t need to keep that in the value, either.

 

 

Comment mapper code.This mapper parseseach input line of commentXML. Very sim‐ ilar to the UserJoinMapper,it too grabs the user ID associated with each record and outputs it along with the entire inputvalue. The only different here is that the XML attribute UserId representsthe user that posted to comment, where as Id in theuser  data set is the user ID. Here, this mapper prepends the letter B in front ofthe entire value.

public static class CommentJoinMapper extends Mapper<Object, Text, Text, Text> {

 

private Text outkey =newText();

private Text outvalue =newText();

 

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

 

Map<String, String> parsed = transformXmlToMap(value.toString());

 

// The foreign join keyis the userID

outkey.set( parsed.get("UserId"));

 

// Flag this record for the reducerand then outputoutvalue.set("B" + value.toString()); context.write(outkey, outvalue);

}

}

 

Reducer code.The reducer code iterates through all thevalues of each group and looks atwhat each record is tagged with and then puts the record in one of two lists.After all values are binned in either list, the actual join logic is executedusing the two lists. The join logic differs slightly based on the type of join,but always involves iterating through both lists and writing to the Context object.The type of join is pulled from the job configuration in the setup method. Let’s look at the main reduce method before looking at the join logic.

public static class UserJoinReducer extendsReducer<Text, Text, Text, Text> {

 

private staticfinal Text EMPTY_TEXT = Text("");

private Text tmp =newText();

private ArrayList<Text> listA =newArrayList<Text>();

private ArrayList<Text> listB =newArrayList<Text>();


private String joinType =null;

 

public void setup(Context context) {

// Get the type of join fromour configuration

joinType=context.getConfiguration().get("join.type");

}

 

public void reduce(Text key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

 

// Clear ourlists listA.clear(); listB.clear();

 

// iterate throughall our values,binning each recordbased on what

// it was tagged with.Make sure to remove thetag!

while(values.hasNext()) { tmp=values.next();

if (tmp.charAt(0) == 'A') {

listA.add(new Text(tmp.toString().substring(1)));

} else if (tmp.charAt('0') == 'B') {

listB.add(new Text(tmp.toString().substring(1)));

}

}

 

// Execute our join logicnow that the lists are filled

executeJoinLogic(context);

}

 

private void executeJoinLogic(Context context)

throws IOException, InterruptedException {

...

}

The input data types tothe reducer are two Text objects. The input key isthe foreign join key, which in this example is the user’s ID. The input values associated with the foreign key contain one record from the “users” data set tagged with ‘B’, as well as all the comments the user posted tagged with ‘B’. Any type of data formatting you would want toperform should be done here prior to outputting. For simplicity, the raw XML value from the left data set (users)is output as the key and the raw XML value from the rightdata set (comments) is output as the value.

Next, let’s look at each of the join types. First up is an inner join. If both the lists are not empty, simply performtwo nested forloops and joineach of thevalues together.

if (joinType.equalsIgnoreCase("inner")) {

// If both lists are not empty,join A with B

if (!listA.isEmpty() && !listB.isEmpty()) {

for (Text A : listA) {

for(Text B : listB) { context.write(A, B);


}

}

}

}...

For aleft outer join,if the right list is not empty, join A with B.If the right list is empty, outputeach record of A with an empty string.

... else if(joinType.equalsIgnoreCase("leftouter")) {

// For each entry in A,

for (Text A : listA) {

// If list B is not empty,join A andB

if (!listB.isEmpty()) {

for(Text B : listB) { context.write(A, B);

}

}else{

// Else, outputA by itself

context.write(A, EMPTY_TEXT);

}

}

}...

A rightouter join is very similar, except switching from the check for empty elements fromBto A. If the left list is empty, write records from B withan empty output key.

...else if (joinType.equalsIgnoreCase("rightouter")) {

// For each entry in B,

for (Text B : listB) {

// If list A is not empty,join A andB

if (!listA.isEmpty()) {

for(Text A : listA) { context.write(A, B);

}

} else {

// Else, outputB by itself

context.write(EMPTY_TEXT, B);

}

}

}...

A fullouter join is more complex, in that we want to keep allrecords, ensuring thatwe join records whereappropriate. If list A is not empty, then for everyelement inA, join withB whenthe B listis not empty, or output A by itself. IfA isempty, then just output B.

... else if (joinType.equalsIgnoreCase("fullouter")) {

// If list A is not empty

if (!listA.isEmpty()) {

// For each entry in A

for (Text A : listA) {

// If list B is not empty,join A with B

if (!listB.isEmpty()) {


for(Text B : listB) { context.write(A, B);

}

}else {

// Else, outputA by itself

context.write(A, EMPTY_TEXT);

}

}

} else {

// If list A is empty, just output B

for (Text B : listB) { context.write(EMPTY_TEXT, B);

}

}

}...

For anantijoin, if at least one of the lists is empty, output the recordsfrom the non- empty list with an empty Text object.

... else if(joinType.equalsIgnoreCase("anti")) {

// If list A is empty and B is empty or vice versa

if (listA.isEmpty() ^ listB.isEmpty()) {

 

// Iterate both A and B with null values

// The previous XOR checkwill make sure exactly one of

// these lists is emptyand therefore the list will be skipped

for (Text A : listA) { context.write(A, EMPTY_TEXT);

}

 

for (Text B : listB) { context.write(EMPTY_TEXT, B);


 

}

}


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,云吞铺子总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系统盘、创建快照、配置安全组等操作如何登录ECS云服务器控制台? 1、先登录到阿里云ECS服务器控制台 2、点击顶部的“控制台” 3、通过左侧栏,切换到“云服务器ECS”即可,如下图所示 通过ECS控制台的远程连接来登录到云服务器 阿里云ECS云服务器自带远程连接功能,使用该功能可以登录到云服务器,简单且方便,如下图:点击“远程连接”,第一次连接会自动生成6位数字密码,输入密码即可登录到云服务器上。
33601 0
阿里云服务器安全组设置内网互通的方法
虽然0.0.0.0/0使用非常方便,但是发现很多同学使用它来做内网互通,这是有安全风险的,实例有可能会在经典网络被内网IP访问到。下面介绍一下四种安全的内网互联设置方法。 购买前请先:领取阿里云幸运券,有很多优惠,可到下文中领取。
19217 0
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
18714 0
阿里云服务器ECS远程登录用户名密码查询方法
阿里云服务器ECS远程连接登录输入用户名和密码,阿里云没有默认密码,如果购买时没设置需要先重置实例密码,Windows用户名是administrator,Linux账号是root,阿小云来详细说下阿里云服务器远程登录连接用户名和密码查询方法
21676 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
25231 0
阿里云服务器ECS登录用户名是什么?系统不同默认账号也不同
阿里云服务器Windows系统默认用户名administrator,Linux镜像服务器用户名root
13948 0
windows server 2008阿里云ECS服务器安全设置
最近我们Sinesafe安全公司在为客户使用阿里云ecs服务器做安全的过程中,发现服务器基础安全性都没有做。为了为站长们提供更加有效的安全基础解决方案,我们Sinesafe将对阿里云服务器win2008 系统进行基础安全部署实战过程! 比较重要的几部分 1.
11861 0
如何设置阿里云服务器安全组?阿里云安全组规则详细解说
阿里云安全组设置详细图文教程(收藏起来) 阿里云服务器安全组设置规则分享,阿里云服务器安全组如何放行端口设置教程。阿里云会要求客户设置安全组,如果不设置,阿里云会指定默认的安全组。那么,这个安全组是什么呢?顾名思义,就是为了服务器安全设置的。安全组其实就是一个虚拟的防火墙,可以让用户从端口、IP的维度来筛选对应服务器的访问者,从而形成一个云上的安全域。
17197 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
18989 0
+关注
@dailidong@
专注架构 外功修行,内功修神 CSDN博客:http://blog.csdn.net/odalidong
373
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载