有两份不同的文件,他们有相同的键,把他们合并成一个文件;
1、文件内容
- 合并前的文件a.txt和b.txt
$ head a.txt b.txt
==> a.txt <==
aaa1 hdfs
aaa2 hdfs
aaa3 hdfs
aaa4 hdfs
aaa5 hdfs
aaa6 hdfs
aaa7 hdfs
aaa8 hdfs
aaa9 hdfs
aaa10 hdfs
==> b.txt <==
aaa1 mapreduce
aaa2 mapreduce
aaa3 mapreduce
aaa4 mapreduce
aaa5 mapreduce
aaa6 mapreduce
aaa7 mapreduce
aaa8 mapreduce
aaa9 mapreduce
aaa10 mapreduce
2、思路
1)、首先分别对a.txt和b.txt做一个map标签处理(用于区分a.txt和b.txt上的数据);
2)、mapjoin用于输出做过标签处理的a.txt和b.txt;
3)、用一个reducejoin的程序做类似wordcount处理,相同的key放在一起,把a.txt和b.txt上的value放在key后面输出;
3、实现
3.1、创建目录和上传数据
$ hadoop fs -mkdir /input/join
$ hadoop fs -mkdir /output/join/
$ hadoop fs -put a.txt b.txt /input/join
3.2、 mapperA程序
#!/usr/bin/env python
# -*- conding:utf-8 -*-
import sys
def mapper():
for line in sys.stdin:
wordline = line.strip().split()
wordkey = wordline[0]
wordvalue = wordline[1]
#print wordline
print "%s\ta\t%s" % (wordkey, wordvalue)
if __name__ == "__main__":
mapper()
3.3、mapperB程序
#!/usr/bin/env python
# -*- conding:utf-8 -*-
import sys
def mapper():
for line in sys.stdin:
wordline = line.strip().split()
wordkey = wordline[0]
wordvalue = wordline[1]
print "%s\tb\t%s" % (wordkey, wordvalue)
if __name__ == "__main__":
mapper()
3.4、mapperjoin程序
#!/usr/bin/env python
# -*- conding:utf-8 -*-
import sys
def mapper():
for line in sys.stdin:
print line.strip()
if __name__ == "__main__":
mapper()
3.5、reducerjoin程序
#!/usr/bin/env python
# -*- conding:utf-8 -*-
import sys
def reducer():
valueA = ''
for line in sys.stdin:
wordkey, flag, wordvalue = line.strip().split('\t')
if flag == 'a':
valueA = wordvalue
elif flag == 'b':
valueB = wordvalue
print "%s\t%s\t%s" % (wordkey,valueA,valueB)
valueA = ''
if __name__ == "__main__":
reducer()
3.6、run_streaming程序
#!/bin/bash
HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.13.0.jar"
INPUT_FILE_PATH_A="/input/join/a.txt"
INPUT_FILE_PATH_B="/input/join/b.txt"
OUTPUT_FILE_PATH_A="/output/join/a"
OUTPUT_FILE_PATH_B="/output/join/b"
OUTPUT_FILE_JOIN_PATH="/output/join/abjoin"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_A
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_B
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_JOIN_PATH
# step1: map a
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_A \
-output $OUTPUT_FILE_PATH_A \
-jobconf "mapred.job.name=joinfinemapA" \
-mapper "python mapperA.py" \
-file "./mapperA.py"
# step2: map b
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_B \
-output $OUTPUT_FILE_PATH_B \
-jobconf "mapred.job.name=joinfinemapB" \
-mapper "python mapperB.py" \
-file "./mapperB.py"
# step3: join
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $OUTPUT_FILE_PATH_A,$OUTPUT_FILE_PATH_B \
-output $OUTPUT_FILE_JOIN_PATH \
-mapper "python mapperjoin.py" \
-reducer "python reducerjoin.py" \
-jobconf "mapred.job.name=joinfinemapAB" \
-jobconf "stream.num.map.output.key.fields=2" \
-jobconf "num.key.fields.for.partition=1" \
-file "./reducerjoin.py" \
-file "./mapperjoin.py"
3.7、执行程序
$ ./run_streamingab.sh
...中间省略...
18/02/05 10:43:13 INFO streaming.StreamJob: Output directory: /output/join/a
...中间省略...
18/02/05 10:43:42 INFO streaming.StreamJob: Output directory: /output/join/b
...中间省略...
18/02/05 10:44:12 INFO streaming.StreamJob: Output directory: /output/join/abjoin
3.8、查看结果
$ hadoop fs -ls /output/join/abjoin
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2018-02-05 10:44 /output/join/abjoin/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 6276 2018-02-05 10:44 /output/join/abjoin/part-00000
$ hadoop fs -text /output/join/abjoin/part-00000|head
aaa1 hdfs mapreduce
aaa10 hdfs mapreduce
aaa100 hdfs mapreduce
aaa11 hdfs mapreduce
aaa12 hdfs mapreduce
aaa13 hdfs mapreduce
aaa14 hdfs mapreduce
aaa15 hdfs mapreduce
aaa16 hdfs mapreduce
aaa17 hdfs mapreduce
4、hadoop streaming 语法参考
本文转自 巴利奇 51CTO博客,原文链接:http://blog.51cto.com/balich/2068884