hadoop mapreduce开发实践文件合并(join)

简介:

有两份不同的文件,他们有相同的键,把他们合并成一个文件;

1、文件内容

  • 合并前的文件a.txt和b.txt
$ head a.txt b.txt 
==> a.txt <==
aaa1    hdfs
aaa2    hdfs
aaa3    hdfs
aaa4    hdfs
aaa5    hdfs
aaa6    hdfs
aaa7    hdfs
aaa8    hdfs
aaa9    hdfs
aaa10   hdfs

==> b.txt <==
aaa1    mapreduce
aaa2    mapreduce
aaa3    mapreduce
aaa4    mapreduce
aaa5    mapreduce
aaa6    mapreduce
aaa7    mapreduce
aaa8    mapreduce
aaa9    mapreduce
aaa10   mapreduce

2、思路

1)、首先分别对a.txt和b.txt做一个map标签处理(用于区分a.txt和b.txt上的数据);
2)、mapjoin用于输出做过标签处理的a.txt和b.txt;
3)、用一个reducejoin的程序做类似wordcount处理,相同的key放在一起,把a.txt和b.txt上的value放在key后面输出;

3、实现

3.1、创建目录和上传数据

$ hadoop fs -mkdir /input/join
$ hadoop fs -mkdir /output/join/
$ hadoop fs -put a.txt b.txt /input/join

3.2、 mapperA程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        wordline = line.strip().split()
        wordkey = wordline[0]
        wordvalue = wordline[1]
        #print wordline

        print "%s\ta\t%s" % (wordkey, wordvalue)

if __name__ == "__main__":
    mapper()

3.3、mapperB程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        wordline = line.strip().split()
        wordkey = wordline[0]
        wordvalue = wordline[1]

        print "%s\tb\t%s" % (wordkey, wordvalue)

if __name__ == "__main__":
    mapper()

3.4、mapperjoin程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        print line.strip()

if __name__ == "__main__":
    mapper()

3.5、reducerjoin程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def reducer():
    valueA = ''
    for line in sys.stdin:
        wordkey, flag, wordvalue = line.strip().split('\t')
        if flag == 'a':
            valueA = wordvalue
        elif flag == 'b':
            valueB = wordvalue
            print "%s\t%s\t%s" % (wordkey,valueA,valueB)
            valueA = ''

if __name__ == "__main__":
    reducer()

3.6、run_streaming程序

#!/bin/bash

HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.13.0.jar"

INPUT_FILE_PATH_A="/input/join/a.txt"
INPUT_FILE_PATH_B="/input/join/b.txt"

OUTPUT_FILE_PATH_A="/output/join/a"
OUTPUT_FILE_PATH_B="/output/join/b"
OUTPUT_FILE_JOIN_PATH="/output/join/abjoin"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_A
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_B
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_JOIN_PATH

# step1: map a
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $INPUT_FILE_PATH_A \
                -output $OUTPUT_FILE_PATH_A \
                -jobconf "mapred.job.name=joinfinemapA" \
                -mapper "python mapperA.py" \
                -file "./mapperA.py"
# step2: map b
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $INPUT_FILE_PATH_B \
                -output $OUTPUT_FILE_PATH_B \
                -jobconf "mapred.job.name=joinfinemapB" \
                -mapper "python mapperB.py" \
                -file "./mapperB.py"

# step3: join
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $OUTPUT_FILE_PATH_A,$OUTPUT_FILE_PATH_B \
                -output $OUTPUT_FILE_JOIN_PATH \
                -mapper "python mapperjoin.py" \
                -reducer "python reducerjoin.py" \
                -jobconf "mapred.job.name=joinfinemapAB" \
                -jobconf "stream.num.map.output.key.fields=2" \
                -jobconf "num.key.fields.for.partition=1" \
                -file "./reducerjoin.py" \
                -file "./mapperjoin.py"

3.7、执行程序

$ ./run_streamingab.sh 

...中间省略...
18/02/05 10:43:13 INFO streaming.StreamJob: Output directory: /output/join/a

...中间省略...

18/02/05 10:43:42 INFO streaming.StreamJob: Output directory: /output/join/b

...中间省略...

18/02/05 10:44:12 INFO streaming.StreamJob: Output directory: /output/join/abjoin

3.8、查看结果

$ hadoop fs -ls /output/join/abjoin
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2018-02-05 10:44 /output/join/abjoin/_SUCCESS
-rw-r--r--   1 hadoop supergroup       6276 2018-02-05 10:44 /output/join/abjoin/part-00000
$ hadoop fs -text /output/join/abjoin/part-00000|head
aaa1    hdfs    mapreduce
aaa10   hdfs    mapreduce
aaa100  hdfs    mapreduce
aaa11   hdfs    mapreduce
aaa12   hdfs    mapreduce
aaa13   hdfs    mapreduce
aaa14   hdfs    mapreduce
aaa15   hdfs    mapreduce
aaa16   hdfs    mapreduce
aaa17   hdfs    mapreduce

4、hadoop streaming 语法参考



本文转自 巴利奇 51CTO博客,原文链接:http://blog.51cto.com/balich/2068884
相关文章
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
377 2
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
347 3
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
160 3
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
218 2
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
229 1
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
360 1
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
328 1
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
276 1
|
存储 分布式计算 Hadoop
【揭秘Hadoop背后的秘密!】HDFS读写流程大曝光:从理论到实践,带你深入了解Hadoop分布式文件系统!
【8月更文挑战第24天】Hadoop分布式文件系统(HDFS)是Hadoop生态系统的关键组件,专为大规模数据集提供高效率存储及访问。本文深入解析HDFS数据读写流程并附带示例代码。HDFS采用NameNode和DataNode架构,前者负责元数据管理,后者承担数据块存储任务。文章通过Java示例演示了如何利用Hadoop API实现数据的写入与读取,有助于理解HDFS的工作原理及其在大数据处理中的应用价值。
433 1