hadoop jython join ( 1 )

简介:
首先 本文中的 hadoop join  在实际开发没有用处!
如果在开发中 请使用  cascading  groupby, 进行 hadoop join,
本文只是为探讨弄懂 cascading 实现做准备。

当然 如果有有人 hadoop join 过 请联系我,大家交流下 !
文件可能需要的一些参考:
hadoop jython ( windows )
jython ,jython 编译以及jar 包 
少量 linux shell 
本文介绍 hadoop 可能使用到的 join 接口测试 ,已经参考:
使用Hadoop实现Inner Join操作的方法【from淘宝】 :http://labs.chinamobile.com/groups/58_547 
下面 测试后 ,我这大体上 对 hadoop  join 的方式是这样理解的 (猜想):
数据1 ; 数据2
job1 .map( 数据1 ) =(临时文件1)>  文件标示1+需要join列  数据
job2 .map( 数据2 ) =(临时文件2)>  文件标示2+需要join列  数据
临时文件  mapred.join.expr  生成
job3.map -> 
文件标示1+需要join列 : 数据
文件标示2+需要join列 : 数据
......
job3.Combiner - >
需要join列 : 文件标示1+数据
需要join列 : 文件标示2+数据
job3.Reducer->
需要join列 : 使用 java-list > 生成 
文件2-列x  [  数据,数据... ]
文件1-列x  [  数据,数据... ]
然后 你这 left join ,或 inner join 或 xxx join 逻辑 就自己来吧
结果集合
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/1
1
2
3
4
5
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/2
2
4
3
1
修改 ..../hadoop-0.18.3/src/examples/python/compile
# !/usr/bin/env bash

export HADOOP_HOME
=/ home /xx / del / jobs / tools / hadoop - 0.18 . 3
export CASCADING_HOME
=/ home /xx / del / jobs / tools / cascading - 1.0 . 16 - hadoop - 0.18 . 3
export JYTHON_HOME
=/ home /xx / del / jobs / tools / jython2 . 2.1

export CLASSPATH
= " $HADOOP_HOME/hadoop-0.18.3-core.jar "                                             

#  so that filenames w/ spaces are handled correctly in loops below
IFS =

#  add libs to CLASSPATH

for  f in  $HADOOP_HOME / lib /*. jar;  do                                                                
  CLASSPATH
= ${CLASSPATH} : $f ;
done

for  f in  $HADOOP_HOME / lib / jetty - ext /*. jar;  do
  CLASSPATH
= ${CLASSPATH} : $f ;
done

for  f in  $CASCADING_HOME /*. jar;  do
  CLASSPATH
= ${CLASSPATH} : $f ;
done

for  f in  $CASCADING_HOME / lib /*. jar;  do
  CLASSPATH
= ${CLASSPATH} : $f ;
done


for  f in  $JYTHON_HOME /*. jar;  do
  CLASSPATH
= ${CLASSPATH} : $f ;
done

#  restore ordinary behaviour

unset IFS

/ home /xx / del / jobs / tools / jython2 . 2.1 / jythonc  - p org . apache . hadoop . examples  - - j $ 1 . jar   - c $ 1 . py 
/ home /xx / del / jobs / tools / hadoop - 0.18 . 3 / bin / hadoop jar $ 1 . jar $ 2  $ 3  $ 4  $ 5  $ 6  $ 7  $ 8  $ 9   
简单  数据 链接  :
from  org.apache.hadoop.fs  import  Path                                                             
from  org.apache.hadoop.io  import   *                                                                 
from  org.apache.hadoop.mapred.lib  import   *                                                         
from  org.apache.hadoop.mapred.join   import   *                                                       
from  org.apache.hadoop.mapred  import   *                                                             
import  sys                                                                                        
import  getopt                                                                                     
                                                                                                  
class  tMap(Mapper, MapReduceBase):                                                                  
        
def  map(self, key, value, output, reporter):                                              
                output.collect( Text( str(key) ) , Text( value.toString() ))                      
                                                                                       
                                
def  main(args):                                                                                   
        conf 
=  JobConf(tMap)                                                                      
        conf.setJobName(
" wordcount " )                                                              
                                                                                                  
        conf.setMapperClass( tMap )                                                               

        FileInputFormat.setInputPaths(conf,[ Path(sp)  for  sp  in  args[ 1 : - 1 ]])                      
        conf.setOutputKeyClass( Text )
        conf.setOutputValueClass( Text )                                                          

        conf.setOutputPath(Path(args[
- 1 ]))                                                        
        
        JobClient.runJob(conf)                                                                    
        
if   __name__   ==   " __main__ " :main(sys.argv)      
运行 
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2   file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc78
结果:
[xx@localhost wc78]$ cat ../wc78/part-00000 
0    1
0    2
2    4
2    2
4    3
4    3
6    1
6    4
8    5
简单的数据 join :
from  org.apache.hadoop.fs  import  Path
from  org.apache.hadoop.io  import   *
from  org.apache.hadoop.mapred.lib  import   *
from  org.apache.hadoop.mapred.join   import   *
from  org.apache.hadoop.mapred  import   *
import  sys
import  getopt

class  tMap(Mapper, MapReduceBase):
        
def  map(self, key, value, output, reporter):
                output.collect( Text( str(key) ) , Text( value.toString() ))

def  main(args):
        conf 
=  JobConf(tMap)
        conf.setJobName(
" wordcount " )
        conf.setMapperClass( tMap )

        conf.set( " mapred.join.expr " , CompositeInputFormat.compose( " override " ,TextInputFormat, args[ 1 : - 1 ] ) )
        conf.setOutputKeyClass( Text )
        conf.setOutputValueClass( Text )

        conf.setInputFormat(CompositeInputFormat)
     
        conf.setOutputPath(Path(args[
- 1 ]))

        JobClient.runJob(conf)

if   __name__   ==   " __main__ " :main(sys.argv)
         
运行结果 (  ) :
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2   file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc79
[xx@localhost wc78]$ cat ../wc79/part-00000 
0    2
2    4
4    3
6    1

8    5

本文转自博客园刘凯毅的博客,原文链接:hadoop jython join ( 1 ),如需转载请自行联系原博主。

目录
相关文章
|
3月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
55 3
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
缓存 分布式计算 Hadoop
hadoop之Map join和Reduce join (13)
hadoop之Map join和Reduce join (13)
151 0
hadoop之Map join和Reduce join (13)
|
分布式计算 Hadoop
Hadoop Join
Hadoop 中的join分为三种 Reduce端join,适合于两个大表 Map端join,适合一个大表和一个小表,小表放到 Distribute Cache里面 semi join 当join只用到其中一个表中的一小...
604 0
|
3月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
206 6

相关实验场景

更多