hadoop mapreduce开发实践之HDFS文件分发by streaming

简介:

1、分发HDFS文件(-cacheFile)

需求:wordcount(只统计指定的单词),但是该文件非常大,可以先将该文件上传到hdfs,通过-cacheFile的方式进行分发;

-cachefile hdfs://host:port/path/to/file#linkname #选项在计算节点上缓存文件,streaming程序通过./linkname的方式访问文件。

思路:mapper和reducer程序都不需要修改,只是在运行streaming的时候需要使用-cacheFile 指定hdfs上的文件;

1.1、streaming命令格式

$HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar \
    -jobconf mapred.job.name="streaming_wordcount" \
    -jobconf mapred.job.priority=3 \
    -input /input/ \
    -output /output/ \
    -mapper "python mapper.py whc" \
    -reducer "python reducer.py" \
    -cacheFile "hdfs://master:9000/cache_file/wordwhite#whc"
    -file ./mapper.py \
    -file ./reducer.py 

注:-cacheFile "hdfs://master:9000/cache_file/wordwhite#whc" whc表示在hdfs上该文件的别名,在-mapper "python mapper.py whc"就如同使用本地文件一样。

1.2、上传wordwhite

$ hadoop fs -mkdir /input/cachefile
$ hadoop fs -put wordwhite  /input/cachefile
$ hadoop fs -ls /input/cachefile
Found 1 items
-rw-r--r--   1 hadoop supergroup         12 2018-01-26 15:02 /input/cachefile/wordwhite
$ hadoop fs -text hdfs://localhost:9000/input/cachefile/wordwhite
the
and
had

1.3 run_streaming程序

mapper和reducer程序参考本地分发实例

$ vim runstreaming_cachefile.sh 

#!/bin/bash

HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-2.6.0-cdh5.13.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.13.0.jar"

INPUT_FILE_PATH="/input/The_Man_of_Property"
OUTPUT_FILE_PATH="/output/wordcount/wordwhitecachefiletest"

$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $INPUT_FILE_PATH \
                -output $OUTPUT_FILE_PATH \
                -jobconf "mapred.job.name=wordcount_wordwhite_cachefile_demo" \
                -mapper "python mapper.py WHF" \
                -reducer "python reducer.py" \
                -cacheFile "hdfs://localhost:9000/input/cachefile/wordwhite#WHF" \
                -file ./mapper.py \
                -file ./reducer.py

1.4、执行程序

$ ./runstreaming_cachefile.sh 
18/01/26 15:38:27 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
18/01/26 15:38:28 WARN streaming.StreamJob: -cacheFile option is deprecated, please use -files instead.
18/01/26 15:38:28 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
18/01/26 15:38:28 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [./mapper.py, ./reducer.py, /tmp/hadoop-unjar1709565523181962236/] [] /tmp/streamjob6164905989972408041.jar tmpDir=null
18/01/26 15:38:29 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/26 15:38:29 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/01/26 15:38:31 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/26 15:38:31 INFO mapreduce.JobSubmitter: number of splits:2
18/01/26 15:38:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516345010544_0012
18/01/26 15:38:32 INFO impl.YarnClientImpl: Submitted application application_1516345010544_0012
18/01/26 15:38:32 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1516345010544_0012/
18/01/26 15:38:32 INFO mapreduce.Job: Running job: job_1516345010544_0012
18/01/26 15:38:40 INFO mapreduce.Job: Job job_1516345010544_0012 running in uber mode : false
18/01/26 15:38:40 INFO mapreduce.Job:  map 0% reduce 0%
18/01/26 15:38:49 INFO mapreduce.Job:  map 50% reduce 0%
18/01/26 15:38:50 INFO mapreduce.Job:  map 100% reduce 0%
18/01/26 15:38:57 INFO mapreduce.Job:  map 100% reduce 100%
18/01/26 15:38:57 INFO mapreduce.Job: Job job_1516345010544_0012 completed successfully
18/01/26 15:38:57 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=73950
        FILE: Number of bytes written=582590
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=636501
        HDFS: Number of bytes written=27
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=1
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=12921
        Total time spent by all reduces in occupied slots (ms)=5641
        Total time spent by all map tasks (ms)=12921
        Total time spent by all reduce tasks (ms)=5641
        Total vcore-milliseconds taken by all map tasks=12921
        Total vcore-milliseconds taken by all reduce tasks=5641
        Total megabyte-milliseconds taken by all map tasks=13231104
        Total megabyte-milliseconds taken by all reduce tasks=5776384
    Map-Reduce Framework
        Map input records=2866
        Map output records=9243
        Map output bytes=55458
        Map output materialized bytes=73956
        Input split bytes=198
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=73956
        Reduce input records=9243
        Reduce output records=3
        Spilled Records=18486
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=360
        CPU time spent (ms)=3910
        Physical memory (bytes) snapshot=719896576
        Virtual memory (bytes) snapshot=8331550720
        Total committed heap usage (bytes)=602931200
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=636303
    File Output Format Counters 
        Bytes Written=27
18/01/26 15:38:57 INFO streaming.StreamJob: Output directory: /output/wordcount/wordwhitecachefiletest

1.5、查看结果

$ hadoop fs -ls /output/wordcount/wordwhitecachefiletest
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2018-01-26 15:38 /output/wordcount/wordwhitecachefiletest/_SUCCESS
-rw-r--r--   1 hadoop supergroup         27 2018-01-26 15:38 /output/wordcount/wordwhitecachefiletest/part-00000

$ hadoop fs -text /output/wordcount/wordwhitecachefiletest/part-00000
and 2573
had 1526
the 5144

以上就完成了分发HDFS上的文件并指定单词的wordcount.

2、hadoop streaming 语法参考

本文转自 巴利奇 51CTO博客,原文链接:http://blog.51cto.com/balich/2065812

相关文章
|
4天前
|
存储 分布式计算 运维
Hadoop的HDFS问题
【5月更文挑战第5天】Hadoop的HDFS问题
11 3
|
1月前
|
分布式计算 Hadoop 测试技术
Hadoop【基础知识 05】【HDFS的JavaAPI】(集成及测试)
【4月更文挑战第5天】Hadoop【基础知识 05】【HDFS的JavaAPI】(集成及测试)
46 8
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop【基础知识 03+04】【Hadoop集群资源管理器yarn】(图片来源于网络)(hadoop fs + hadoop dfs + hdfs dfs 使用举例)
【4月更文挑战第5天】Hadoop【基础知识 03】【Hadoop集群资源管理器yarn】(图片来源于网络)Hadoop【基础知识 04】【HDFS常用shell命令】(hadoop fs + hadoop dfs + hdfs dfs 使用举例)
54 9
|
1月前
|
分布式计算 Hadoop Shell
Hadoop【基础知识 04】【HDFS常用shell命令】(hadoop fs + hadoop dfs + hdfs dfs 使用举例)
【4月更文挑战第4天】Hadoop【基础知识 04】【HDFS常用shell命令】(hadoop fs + hadoop dfs + hdfs dfs 使用举例)
33 5
|
1月前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
87 2
|
1月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
5天前
|
分布式计算 资源调度 Hadoop
java与大数据:Hadoop与MapReduce
java与大数据:Hadoop与MapReduce
24 0
|
15天前
|
存储 分布式计算 Hadoop
【专栏】Hadoop,开源大数据处理框架:驭服数据洪流的利器
【4月更文挑战第28天】Hadoop,开源大数据处理框架,由Hadoop Common、HDFS、YARN和MapReduce组成,提供大规模数据存储和并行处理。其优势在于可扩展性、容错性、高性能、灵活性及社区支持。然而,数据安全、处理速度、系统复杂性和技能短缺是挑战。通过加强安全措施、结合Spark、自动化工具和培训,Hadoop在应对大数据问题中保持关键地位。
|
20天前
|
分布式计算 Hadoop 大数据
[大数据] mac 史上最简单 hadoop 安装过程
[大数据] mac 史上最简单 hadoop 安装过程
|
1月前
|
SQL 分布式计算 Hadoop
利用Hive与Hadoop构建大数据仓库:从零到一
【4月更文挑战第7天】本文介绍了如何使用Apache Hive与Hadoop构建大数据仓库。Hadoop的HDFS和YARN提供分布式存储和资源管理,而Hive作为基于Hadoop的数据仓库系统,通过HiveQL简化大数据查询。构建过程包括设置Hadoop集群、安装配置Hive、数据导入与管理、查询分析以及ETL与调度。大数据仓库的应用场景包括海量数据存储、离线分析、数据服务化和数据湖构建,为企业决策和创新提供支持。
86 1