hadoop mapreduce开发实践文件合并(join)

简介:

有两份不同的文件,他们有相同的键,把他们合并成一个文件;

1、文件内容

  • 合并前的文件a.txt和b.txt
$ head a.txt b.txt 
==> a.txt <==
aaa1    hdfs
aaa2    hdfs
aaa3    hdfs
aaa4    hdfs
aaa5    hdfs
aaa6    hdfs
aaa7    hdfs
aaa8    hdfs
aaa9    hdfs
aaa10   hdfs

==> b.txt <==
aaa1    mapreduce
aaa2    mapreduce
aaa3    mapreduce
aaa4    mapreduce
aaa5    mapreduce
aaa6    mapreduce
aaa7    mapreduce
aaa8    mapreduce
aaa9    mapreduce
aaa10   mapreduce

2、思路

1)、首先分别对a.txt和b.txt做一个map标签处理(用于区分a.txt和b.txt上的数据);
2)、mapjoin用于输出做过标签处理的a.txt和b.txt;
3)、用一个reducejoin的程序做类似wordcount处理,相同的key放在一起,把a.txt和b.txt上的value放在key后面输出;

3、实现

3.1、创建目录和上传数据

$ hadoop fs -mkdir /input/join
$ hadoop fs -mkdir /output/join/
$ hadoop fs -put a.txt b.txt /input/join

3.2、 mapperA程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        wordline = line.strip().split()
        wordkey = wordline[0]
        wordvalue = wordline[1]
        #print wordline

        print "%s\ta\t%s" % (wordkey, wordvalue)

if __name__ == "__main__":
    mapper()

3.3、mapperB程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        wordline = line.strip().split()
        wordkey = wordline[0]
        wordvalue = wordline[1]

        print "%s\tb\t%s" % (wordkey, wordvalue)

if __name__ == "__main__":
    mapper()

3.4、mapperjoin程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def mapper():
    for line in sys.stdin:
        print line.strip()

if __name__ == "__main__":
    mapper()

3.5、reducerjoin程序

#!/usr/bin/env python
# -*- conding:utf-8 -*-

import sys

def reducer():
    valueA = ''
    for line in sys.stdin:
        wordkey, flag, wordvalue = line.strip().split('\t')
        if flag == 'a':
            valueA = wordvalue
        elif flag == 'b':
            valueB = wordvalue
            print "%s\t%s\t%s" % (wordkey,valueA,valueB)
            valueA = ''

if __name__ == "__main__":
    reducer()

3.6、run_streaming程序

#!/bin/bash

HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.13.0.jar"

INPUT_FILE_PATH_A="/input/join/a.txt"
INPUT_FILE_PATH_B="/input/join/b.txt"

OUTPUT_FILE_PATH_A="/output/join/a"
OUTPUT_FILE_PATH_B="/output/join/b"
OUTPUT_FILE_JOIN_PATH="/output/join/abjoin"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_A
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_PATH_B
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_FILE_JOIN_PATH

# step1: map a
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $INPUT_FILE_PATH_A \
                -output $OUTPUT_FILE_PATH_A \
                -jobconf "mapred.job.name=joinfinemapA" \
                -mapper "python mapperA.py" \
                -file "./mapperA.py"
# step2: map b
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $INPUT_FILE_PATH_B \
                -output $OUTPUT_FILE_PATH_B \
                -jobconf "mapred.job.name=joinfinemapB" \
                -mapper "python mapperB.py" \
                -file "./mapperB.py"

# step3: join
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $OUTPUT_FILE_PATH_A,$OUTPUT_FILE_PATH_B \
                -output $OUTPUT_FILE_JOIN_PATH \
                -mapper "python mapperjoin.py" \
                -reducer "python reducerjoin.py" \
                -jobconf "mapred.job.name=joinfinemapAB" \
                -jobconf "stream.num.map.output.key.fields=2" \
                -jobconf "num.key.fields.for.partition=1" \
                -file "./reducerjoin.py" \
                -file "./mapperjoin.py"

3.7、执行程序

$ ./run_streamingab.sh 

...中间省略...
18/02/05 10:43:13 INFO streaming.StreamJob: Output directory: /output/join/a

...中间省略...

18/02/05 10:43:42 INFO streaming.StreamJob: Output directory: /output/join/b

...中间省略...

18/02/05 10:44:12 INFO streaming.StreamJob: Output directory: /output/join/abjoin

3.8、查看结果

$ hadoop fs -ls /output/join/abjoin
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2018-02-05 10:44 /output/join/abjoin/_SUCCESS
-rw-r--r--   1 hadoop supergroup       6276 2018-02-05 10:44 /output/join/abjoin/part-00000
$ hadoop fs -text /output/join/abjoin/part-00000|head
aaa1    hdfs    mapreduce
aaa10   hdfs    mapreduce
aaa100  hdfs    mapreduce
aaa11   hdfs    mapreduce
aaa12   hdfs    mapreduce
aaa13   hdfs    mapreduce
aaa14   hdfs    mapreduce
aaa15   hdfs    mapreduce
aaa16   hdfs    mapreduce
aaa17   hdfs    mapreduce

4、hadoop streaming 语法参考



本文转自 巴利奇 51CTO博客,原文链接:http://blog.51cto.com/balich/2068884
相关文章
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
322 2
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
363 0
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
223 0
|
10月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
545 79
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
545 6
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
742 4
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
602 2
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
511 1
|
存储 分布式计算 资源调度
两万字长文向你解密大数据组件 Hadoop
两万字长文向你解密大数据组件 Hadoop
647 11

相关实验场景

更多