hadoop mapreduce开发实践文件合并(join)

简介:

有两份不同的文件,他们有相同的键,把他们合并成一个文件;

1、文件内容

  • 合并前的文件a.txt和b.txt
$ head a.txt b.txt 
==> a.txt <==
aaa1    hdfs
aaa2    hdfs
aaa3    hdfs
aaa4    hdfs
aaa5    hdfs
aaa6    hdfs
aaa7    hdfs
aaa8    hdfs
aaa9    hdfs
aaa10   hdfs

==> b.txt <==
aaa1    mapreduce
aaa2    mapreduce
aaa3    mapreduce
aaa4    mapreduce
aaa5    mapreduce
aaa6    mapreduce
aaa7    mapreduce
aaa8    mapreduce
aaa9    mapreduce
aaa10   mapreduce

2、思路

1)、首先分别对a.txt和b.txt做一个map标签处理(用于区分a.txt和b.txt上的数据);
2)、mapjoin用于输出做过标签处理的a.txt和b.txt;
3)、用一个reducejoin的程序做类似wordcount处理,相同的key放在一起,把a.txt和b.txt上的value放在key后面输出;

3、实现

3.1、创建目录和上传数据

$ hadoop fs -mkdir /input/join
$ hadoop fs -mkdir /output/join/
$ hadoop fs -put a.txt b.txt /input/join

3.2、 mapperA程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        wordline = line.strip().split()
        wordkey = wordline[0]
        wordvalue = wordline[1]
        #print wordline

        print "%s\ta\t%s" % (wordkey, wordvalue)

if __name__ == "__main__":
    mapper()

3.3、mapperB程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        wordline = line.strip().split()
        wordkey = wordline[0]
        wordvalue = wordline[1]

        print "%s\tb\t%s" % (wordkey, wordvalue)

if __name__ == "__main__":
    mapper()

3.4、mapperjoin程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        print line.strip()

if __name__ == "__main__":
    mapper()

3.5、reducerjoin程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def reducer():
    valueA = ''
    for line in sys.stdin:
        wordkey, flag, wordvalue = line.strip().split('\t')
        if flag == 'a':
            valueA = wordvalue
        elif flag == 'b':
            valueB = wordvalue
            print "%s\t%s\t%s" % (wordkey,valueA,valueB)
            valueA = ''

if __name__ == "__main__":
    reducer()

3.6、run_streaming程序

#!/bin/bash

HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.13.0.jar"

INPUT_FILE_PATH_A="/input/join/a.txt"
INPUT_FILE_PATH_B="/input/join/b.txt"

OUTPUT_FILE_PATH_A="/output/join/a"
OUTPUT_FILE_PATH_B="/output/join/b"
OUTPUT_FILE_JOIN_PATH="/output/join/abjoin"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_A
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_B
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_JOIN_PATH

# step1: map a
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $INPUT_FILE_PATH_A \
                -output $OUTPUT_FILE_PATH_A \
                -jobconf "mapred.job.name=joinfinemapA" \
                -mapper "python mapperA.py" \
                -file "./mapperA.py"
# step2: map b
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $INPUT_FILE_PATH_B \
                -output $OUTPUT_FILE_PATH_B \
                -jobconf "mapred.job.name=joinfinemapB" \
                -mapper "python mapperB.py" \
                -file "./mapperB.py"

# step3: join
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $OUTPUT_FILE_PATH_A,$OUTPUT_FILE_PATH_B \
                -output $OUTPUT_FILE_JOIN_PATH \
                -mapper "python mapperjoin.py" \
                -reducer "python reducerjoin.py" \
                -jobconf "mapred.job.name=joinfinemapAB" \
                -jobconf "stream.num.map.output.key.fields=2" \
                -jobconf "num.key.fields.for.partition=1" \
                -file "./reducerjoin.py" \
                -file "./mapperjoin.py"

3.7、执行程序

$ ./run_streamingab.sh 

...中间省略...
18/02/05 10:43:13 INFO streaming.StreamJob: Output directory: /output/join/a

...中间省略...

18/02/05 10:43:42 INFO streaming.StreamJob: Output directory: /output/join/b

...中间省略...

18/02/05 10:44:12 INFO streaming.StreamJob: Output directory: /output/join/abjoin

3.8、查看结果

$ hadoop fs -ls /output/join/abjoin
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2018-02-05 10:44 /output/join/abjoin/_SUCCESS
-rw-r--r--   1 hadoop supergroup       6276 2018-02-05 10:44 /output/join/abjoin/part-00000
$ hadoop fs -text /output/join/abjoin/part-00000|head
aaa1    hdfs    mapreduce
aaa10   hdfs    mapreduce
aaa100  hdfs    mapreduce
aaa11   hdfs    mapreduce
aaa12   hdfs    mapreduce
aaa13   hdfs    mapreduce
aaa14   hdfs    mapreduce
aaa15   hdfs    mapreduce
aaa16   hdfs    mapreduce
aaa17   hdfs    mapreduce

4、hadoop streaming 语法参考



本文转自 巴利奇 51CTO博客,原文链接:http://blog.51cto.com/balich/2068884
相关文章
|
30天前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
61 2
|
1月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
31 3
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
81 3
|
1月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
30 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
44 1
|
30天前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
78 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
34 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
44 0
|
3月前
|
存储 分布式计算 Hadoop
【揭秘Hadoop背后的秘密!】HDFS读写流程大曝光:从理论到实践,带你深入了解Hadoop分布式文件系统!
【8月更文挑战第24天】Hadoop分布式文件系统(HDFS)是Hadoop生态系统的关键组件,专为大规模数据集提供高效率存储及访问。本文深入解析HDFS数据读写流程并附带示例代码。HDFS采用NameNode和DataNode架构,前者负责元数据管理,后者承担数据块存储任务。文章通过Java示例演示了如何利用Hadoop API实现数据的写入与读取,有助于理解HDFS的工作原理及其在大数据处理中的应用价值。
82 1
|
3月前
|
存储 分布式计算 算法
MapReduce 处理压缩文件的能力
【8月更文挑战第12天】
42 4

相关实验场景

更多