hadoop mapreduce开发实践之本地文件分发by streaming

简介:

场景:程序运行所需要的文件、脚本程序或者配置文件不在hadoop集群上,则首先要将这些文件分发到hadoop集群上才可以进行计算;
hadoop提供了自动分发文件也压缩包的功能,只需要在启动hadoop streaming作业的时候增加响应的配置参数(-file)即可实现。

在执行streaming程序时,使用 -file 选项指定需要分发的本地文件;

1、本地文件分发(-file)

1.1、需求:wordcount(只统计指定的单词【the,and,had】)

思路:在之前的Wordcount中,是统计了所有文本内单词的Wordcount,在此基础上修改程序,增加一个类似白名单的文本wordwhite记录只统计的单词;在编写mapper程序时候,如果从文本获取的单词只有在wordwhite中的单词在输出map,进而传给reduce;reducer程序不需要修改;

1.2、程序和文件

  • wordwhite (只统计的单词)
$ vim wordwhite 
the
and
had
  • mapper程序
$ vim mapper.py 

#!/usr/bin/env python

import sys

def read_wordwhite(file):
    word_set = set()
    with open(file, 'r') as fd:
        for line in fd:
            word = line.strip()
            word_set.add(word)
    return word_set

def mapper(file_fd):
    word_set = read_wordwhite(file_fd)

    for line in sys.stdin:
        line = line.strip()
        words = line.split()
        for word in words:
            if word != "" and (word in word_set):
                print "%s\t%s" %(word, 1)

if __name__ == "__main__":
    if sys.argv[1]:
        file_fd = sys.argv[1]
        mapper(file_fd)
  • reducer程序
 vim reducer.py 

#!/usr/bin/env python

import sys

def reducer():
    current_word = None
    word_sum = 0

    for line in sys.stdin:
        word_list = line.strip().split('\t')
        if len(word_list) < 2:
            continue
        word = word_list[0].strip()
        word_value = word_list[1].strip()

        if current_word == None:
            current_word = word
        if current_word != word:
            print "%s\t%s" %(current_word, str(word_sum))
            current_word = word
            word_sum = 0
        word_sum += int(word_value)

    print "%s\t%s" %(current_word, str(word_sum))

if __name__ == "__main__":
    reducer()
  • run_streaming程序
$ vim runstreaming.sh

#!/bin/bash

HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.13.0.jar"

INPUT_FILE_PATH="/input/The_Man_of_Property"
OUTPUT_FILE_PATH="/output/wordcount/wordwhitetest"

# 
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $INPUT_FILE_PATH \
                -output $OUTPUT_FILE_PATH \
                -mapper "python mapper.py wordwhite" \
                -reducer "python reducer.py" \
                -file ./mapper.py \
                -file ./reducer.py \
                -file ./wordwhite
  • 执行程序
    首先需要将测试的文件:The_Man_of_Property 上传到hdfs,同时创建wordcount输出目录;
    $ hadoop fs -put ./The_Man_of_Property /input/
    $ hadoop fs -mkdir /output/wordcount

    注:本次hadoop环境是伪分布式,hadoop 2.6版本。

$ ./runstreaming.sh 
18/01/26 13:30:27 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./mapper.py, ./reducer.py, ./wordwhite, /tmp/hadoop-unjar7204532228900236640/] [] /tmp/streamjob7580948745512643345.jar tmpDir=null
18/01/26 13:30:29 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/26 13:30:29 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/26 13:30:31 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/26 13:30:31 INFO mapreduce.JobSubmitter: number of splits:2
18/01/26 13:30:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516345010544_0008
18/01/26 13:30:32 INFO impl.YarnClientImpl: Submitted application application_1516345010544_0008
18/01/26 13:30:32 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1516345010544_0008/
18/01/26 13:30:32 INFO mapreduce.Job: Running job: job_1516345010544_0008
18/01/26 13:30:40 INFO mapreduce.Job: Job job_1516345010544_0008 running in uber mode : false
18/01/26 13:30:40 INFO mapreduce.Job:  map 0% reduce 0%
18/01/26 13:30:50 INFO mapreduce.Job:  map 50% reduce 0%
18/01/26 13:30:51 INFO mapreduce.Job:  map 100% reduce 0%
18/01/26 13:30:58 INFO mapreduce.Job:  map 100% reduce 100%
18/01/26 13:30:59 INFO mapreduce.Job: Job job_1516345010544_0008 completed successfully
18/01/26 13:30:59 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=73950
        FILE: Number of bytes written=582815
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=636501
        HDFS: Number of bytes written=27
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=1
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=12815
        Total time spent by all reduces in occupied slots (ms)=5251
        Total time spent by all map tasks (ms)=12815
        Total time spent by all reduce tasks (ms)=5251
        Total vcore-milliseconds taken by all map tasks=12815
        Total vcore-milliseconds taken by all reduce tasks=5251
        Total megabyte-milliseconds taken by all map tasks=13122560
        Total megabyte-milliseconds taken by all reduce tasks=5377024
    Map-Reduce Framework
        Map input records=2866
        Map output records=9243
        Map output bytes=55458
        Map output materialized bytes=73956
        Input split bytes=198
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=73956
        Reduce input records=9243
        Reduce output records=3
        Spilled Records=18486
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=332
        CPU time spent (ms)=3700
        Physical memory (bytes) snapshot=707719168
        Virtual memory (bytes) snapshot=8333037568
        Total committed heap usage (bytes)=598736896
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=636303
    File Output Format Counters 
        Bytes Written=27
18/01/26 13:30:59 INFO streaming.StreamJob: Output directory: /output/wordcount/wordwhitetest
  • 查看结果
    $ hadoop fs -ls /output/wordcount/wordwhitetest/
    Found 2 items
    -rw-r--r--   1 centos supergroup          0 2018-01-26 13:30 /output/wordcount/wordwhitetest/_SUCCESS
    -rw-r--r--   1 centos supergroup         27 2018-01-26 13:30 /output/wordcount/wordwhitetest/part-00000
    $ hadoop fs -text /output/wordcount/wordwhitetest/part-00000
    and 2573
    had 1526
    the 5144

以上就完成了指定单词的wordcount.

2、hadoop streaming 语法参考


本文转自 巴利奇 51CTO博客,原文链接:http://blog.51cto.com/balich/2065424
相关文章
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
322 2
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
332 3
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
280 1
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
261 1
|
存储 分布式计算 Hadoop
【揭秘Hadoop背后的秘密!】HDFS读写流程大曝光:从理论到实践,带你深入了解Hadoop分布式文件系统!
【8月更文挑战第24天】Hadoop分布式文件系统(HDFS)是Hadoop生态系统的关键组件,专为大规模数据集提供高效率存储及访问。本文深入解析HDFS数据读写流程并附带示例代码。HDFS采用NameNode和DataNode架构,前者负责元数据管理,后者承担数据块存储任务。文章通过Java示例演示了如何利用Hadoop API实现数据的写入与读取,有助于理解HDFS的工作原理及其在大数据处理中的应用价值。
410 1
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
363 0
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
223 0
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
269 0
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
1438 0

相关实验场景

更多