首先 本文中的 hadoop join 在实际开发没有用处!
如果在开发中 请使用 cascading groupby, 进行 hadoop join,
本文只是为探讨弄懂 cascading 实现做准备。
当然 如果有有人 hadoop join 过 请联系我,大家交流下 !
文件可能需要的一些参考:
hadoop jython ( windows )
jython ,jython 编译以及jar 包
少量 linux shell
本文介绍 hadoop 可能使用到的 join 接口测试 ,已经参考:
使用Hadoop实现Inner Join操作的方法【from淘宝】 :http://labs.chinamobile.com/groups/58_547
下面 测试后 ,我这大体上 对 hadoop join 的方式是这样理解的 (猜想):
数据1 ; 数据2
job1 .map( 数据1 ) =(临时文件1)> 文件标示1+需要join列 数据
job2 .map( 数据2 ) =(临时文件2)> 文件标示2+需要join列 数据
临时文件 mapred.join.expr 生成
job3.map ->
文件标示1+需要join列 : 数据
文件标示2+需要join列 : 数据
......
job3.Combiner - >
需要join列 : 文件标示1+数据
需要join列 : 文件标示2+数据
job3.Reducer->
需要join列 : 使用 java-list > 生成
文件2-列x [ 数据,数据... ]
文件1-列x [ 数据,数据... ]
然后 你这 left join ,或 inner join 或 xxx join 逻辑 就自己来吧
结果集合
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/1
1
2
3
4
5
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/2
2
4
3
1
修改 ..../hadoop-0.18.3/src/examples/python/compile
unset IFS
/ home /xx / del / jobs / tools / jython2 . 2.1 / jythonc - p org . apache . hadoop . examples - d - j $ 1 . jar - c $ 1 . py
/ home /xx / del / jobs / tools / hadoop - 0.18 . 3 / bin / hadoop jar $ 1 . jar $ 2 $ 3 $ 4 $ 5 $ 6 $ 7 $ 8 $ 9
简单
数据 链接
:
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2 file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc78
结果:
[xx@localhost wc78]$ cat ../wc78/part-00000
0 1
0 2
2 4
2 2
4 3
4 3
6 1
6 4
8 5
简单的数据 join :
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2 file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc79
[xx@localhost wc78]$ cat ../wc79/part-00000
0 2
2 4
4 3
6 1
如果在开发中 请使用 cascading groupby, 进行 hadoop join,
本文只是为探讨弄懂 cascading 实现做准备。
当然 如果有有人 hadoop join 过 请联系我,大家交流下 !
文件可能需要的一些参考:
hadoop jython ( windows )
jython ,jython 编译以及jar 包
少量 linux shell
本文介绍 hadoop 可能使用到的 join 接口测试 ,已经参考:
使用Hadoop实现Inner Join操作的方法【from淘宝】 :http://labs.chinamobile.com/groups/58_547
下面 测试后 ,我这大体上 对 hadoop join 的方式是这样理解的 (猜想):
数据1 ; 数据2
job1 .map( 数据1 ) =(临时文件1)> 文件标示1+需要join列 数据
job2 .map( 数据2 ) =(临时文件2)> 文件标示2+需要join列 数据
临时文件 mapred.join.expr 生成
job3.map ->
文件标示1+需要join列 : 数据
文件标示2+需要join列 : 数据
......
job3.Combiner - >
需要join列 : 文件标示1+数据
需要join列 : 文件标示2+数据
job3.Reducer->
需要join列 : 使用 java-list > 生成
文件2-列x [ 数据,数据... ]
文件1-列x [ 数据,数据... ]
然后 你这 left join ,或 inner join 或 xxx join 逻辑 就自己来吧
结果集合
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/1
1
2
3
4
5
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/2
2
4
3
1
修改 ..../hadoop-0.18.3/src/examples/python/compile
#
!/usr/bin/env bash
export HADOOP_HOME =/ home /xx / del / jobs / tools / hadoop - 0.18 . 3
export CASCADING_HOME =/ home /xx / del / jobs / tools / cascading - 1.0 . 16 - hadoop - 0.18 . 3
export JYTHON_HOME =/ home /xx / del / jobs / tools / jython2 . 2.1
export CLASSPATH = " $HADOOP_HOME/hadoop-0.18.3-core.jar "
# so that filenames w/ spaces are handled correctly in loops below
IFS =
# add libs to CLASSPATH
for f in $HADOOP_HOME / lib /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
for f in $HADOOP_HOME / lib / jetty - ext /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
for f in $CASCADING_HOME /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
for f in $CASCADING_HOME / lib /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
for f in $JYTHON_HOME /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
# restore ordinary behaviour
export HADOOP_HOME =/ home /xx / del / jobs / tools / hadoop - 0.18 . 3
export CASCADING_HOME =/ home /xx / del / jobs / tools / cascading - 1.0 . 16 - hadoop - 0.18 . 3
export JYTHON_HOME =/ home /xx / del / jobs / tools / jython2 . 2.1
export CLASSPATH = " $HADOOP_HOME/hadoop-0.18.3-core.jar "
# so that filenames w/ spaces are handled correctly in loops below
IFS =
# add libs to CLASSPATH
for f in $HADOOP_HOME / lib /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
for f in $HADOOP_HOME / lib / jetty - ext /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
for f in $CASCADING_HOME /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
for f in $CASCADING_HOME / lib /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
for f in $JYTHON_HOME /*. jar; do
CLASSPATH = ${CLASSPATH} : $f ;
done
# restore ordinary behaviour
unset IFS
/ home /xx / del / jobs / tools / jython2 . 2.1 / jythonc - p org . apache . hadoop . examples - d - j $ 1 . jar - c $ 1 . py
/ home /xx / del / jobs / tools / hadoop - 0.18 . 3 / bin / hadoop jar $ 1 . jar $ 2 $ 3 $ 4 $ 5 $ 6 $ 7 $ 8 $ 9
from
org.apache.hadoop.fs
import
Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred.lib import *
from org.apache.hadoop.mapred.join import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class tMap(Mapper, MapReduceBase):
def map(self, key, value, output, reporter):
output.collect( Text( str(key) ) , Text( value.toString() ))
def main(args):
conf = JobConf(tMap)
conf.setJobName( " wordcount " )
conf.setMapperClass( tMap )
FileInputFormat.setInputPaths(conf,[ Path(sp) for sp in args[ 1 : - 1 ]])
conf.setOutputKeyClass( Text )
conf.setOutputValueClass( Text )
conf.setOutputPath(Path(args[ - 1 ]))
JobClient.runJob(conf)
if __name__ == " __main__ " :main(sys.argv)
运行
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred.lib import *
from org.apache.hadoop.mapred.join import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class tMap(Mapper, MapReduceBase):
def map(self, key, value, output, reporter):
output.collect( Text( str(key) ) , Text( value.toString() ))
def main(args):
conf = JobConf(tMap)
conf.setJobName( " wordcount " )
conf.setMapperClass( tMap )
FileInputFormat.setInputPaths(conf,[ Path(sp) for sp in args[ 1 : - 1 ]])
conf.setOutputKeyClass( Text )
conf.setOutputValueClass( Text )
conf.setOutputPath(Path(args[ - 1 ]))
JobClient.runJob(conf)
if __name__ == " __main__ " :main(sys.argv)
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2 file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc78
结果:
[xx@localhost wc78]$ cat ../wc78/part-00000
0 1
0 2
2 4
2 2
4 3
4 3
6 1
6 4
8 5
简单的数据 join :
from
org.apache.hadoop.fs
import
Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred.lib import *
from org.apache.hadoop.mapred.join import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class tMap(Mapper, MapReduceBase):
def map(self, key, value, output, reporter):
output.collect( Text( str(key) ) , Text( value.toString() ))
def main(args):
conf = JobConf(tMap)
conf.setJobName( " wordcount " )
conf.setMapperClass( tMap )
conf.set( " mapred.join.expr " , CompositeInputFormat.compose( " override " ,TextInputFormat, args[ 1 : - 1 ] ) )
conf.setOutputKeyClass( Text )
conf.setOutputValueClass( Text )
conf.setInputFormat(CompositeInputFormat)
conf.setOutputPath(Path(args[ - 1 ]))
JobClient.runJob(conf)
if __name__ == " __main__ " :main(sys.argv)
运行结果 ( ) :
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred.lib import *
from org.apache.hadoop.mapred.join import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class tMap(Mapper, MapReduceBase):
def map(self, key, value, output, reporter):
output.collect( Text( str(key) ) , Text( value.toString() ))
def main(args):
conf = JobConf(tMap)
conf.setJobName( " wordcount " )
conf.setMapperClass( tMap )
conf.set( " mapred.join.expr " , CompositeInputFormat.compose( " override " ,TextInputFormat, args[ 1 : - 1 ] ) )
conf.setOutputKeyClass( Text )
conf.setOutputValueClass( Text )
conf.setInputFormat(CompositeInputFormat)
conf.setOutputPath(Path(args[ - 1 ]))
JobClient.runJob(conf)
if __name__ == " __main__ " :main(sys.argv)
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2 file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc79
[xx@localhost wc78]$ cat ../wc79/part-00000
0 2
2 4
4 3
6 1
8 5
本文转自博客园刘凯毅的博客,原文链接:hadoop jython join ( 1 ),如需转载请自行联系原博主。