开发者社区> 问答> 正文

用python写hadoop上的map-reduce:报错

本人比较熟悉python,对java不熟悉,大多数hadoop上的开发都是用java的,有公司使用python在hadoop上写map/reduce程序的吗?

展开
收起
kun坤 2020-06-14 09:12:03 685 0
1 条回答
写回答
取消 提交回答
  • 因为需要实际动手操作,你至少有一个Hadoop集群能正常运行,如果你没有Hadoop集群的话,下面的教程可以帮你建立一个。下面的教程是基于Ubuntu系统建立的,但是你可以将他们应用到其它Linux、Unix系统上。

        Running Hadoop On Ubuntu Linux (Multi-Node Cluster)
    

    Running Hadoop On Ubuntu Linux (Single-Node Cluster)

    Python MapReduce 源码解析

    以下Python代码所使用的技巧是基于Hadoop 数据流的接口实现的。借助该接口,我们可以通过标准输入(stdin)和标准输出(stdout)来传递Map和Reduce过程之间的数据。这里我们简单地使用Python的sys.stdin读取数据并通过sys.stdout输出数据,剩下的事情就全交给Hadoop啦~

    Map 将以下代码写入 /home/hduser/mapper.py 它会从stdin读取数据,分割其中的单词然后按行输出单词和其词频到stdout 。不过整个Map处理过程并不会统计每个单词出现总的次数,而是直接输出( , 1)元组。尽管某些单词会出现不止一次,但只要单词出现一次我们就输出一个( , 1)元组。在接下来的Reduce过程我们会统计单词出现的总的次数。当然,你也可以不这么做,但是出于“教程”这一目的,接下来的内容会按照这一思路来写。 :-)

    注意:确保该文件是可执行的(chmod +x /home/hduser/mapper.py),否则会出问题。另外第一行记得添加 #!/usr/bin/env python

    mapper.py 01 #!/usr/bin/env python 02 03 import sys 04 05 # input comes from STDIN (standard input) 06 for line in sys . stdin : 07 # remove leading and trailing whitespace 08 line = line . strip() 09 # split the line into words 10 words = line . split() 11 # increase counters 12 for word in words : 13 # write the results to STDOUT (standard output); 14 # what we output here will be the input for the 15 # Reduce step, i.e. the input for reducer.py 16 # 17 # tab-delimited; the trivial word count is 1 18 print ' %s \t %s ' % ( word , 1)

    Reduce 将以下代码保存到/home/hduser/reducer.py 它会从stdin读取mapper.py的结果(因此mapper.py的输出格式应该与reducer.py的输入格式一致),然后统计每个单词出现的总的次数并输出到stdout

    注意:确保该文件是可执行的(chmod +x /home/hduser/mapper.py),否则会出问题。另外第一行记得添加 #!/usr/bin/env python

    reducer.py 01 #!/usr/bin/env python 02 03 from operator import itemgetter 04 import sys 05 06 current_word = None 07 current_count = 0 08 word = None 09 10 # input comes from STDIN 11 for line in sys . stdin : 12 # remove leading and trailing whitespace 13 line = line . strip() 14 15 # parse the input we got from mapper.py 16 word , count = line . split( ' \t ' , 1) 17 18 # convert count (currently a string) to int 19 try : 20 count = int( count) 21 except ValueError : 22 # count was not a number, so silently 23 # ignore/discard this line 24 continue 25 26 # this IF-switch only works because Hadoop sorts map output 27 # by key (here: word) before it is passed to the reducer 28 if current_word == word : 29 current_count += count 30 else : 31 if current_word : 32 # write result to STDOUT 33 print ' %s \t %s ' % ( current_word , current_count) 34 current_count = count 35 current_word = word 36 37 # do not forget to output the last word if needed! 38 if current_word == word : 39 print ' %s \t %s ' % ( current_word , current_count)

    测试你的代码(cat data | map | sort | reduce)

    建议在Hadoop上实际运行mapreduce之前先在本地测试mapper.py和reducer.py否侧可能会出现程序能正常执行但却完全没有输出结果或者输出不是你想要的结果。如果这发生的话,多半是你自己搞砸了......

    test 01 # very basic test 02 hduser@ubuntu:~ $ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py 03 foo 1 04 foo 1 05 quux 1 06 labs 1 07 foo 1 08 bar 1 09 quux 1 10 11 hduser@ubuntu:~ $ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py 12 bar 1 13 foo 3 14 labs 1 15 quux 2 16 17 # using one of the ebooks as example input 18 # (see below on where to get the ebooks) 19 hduser@ubuntu:~ $ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py 20 The 1 21 Project 1 22 Gutenberg 1 23 EBook 1 24 of 1 25 [... ] 26 (you get the idea)

    在Hadoop上运行 Python代码

    下载测试用的输入数据

    我们使用古腾堡项目中的三本电子书作为测试:

    The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson The Notebooks of Leonardo Da Vinci Ulysses by James Joyce 下载这些电子书的 txt格式,然后将这些文件保存到一个临时文件夹比如 /tmp/gutenberg

    1 hduser@ubuntu:~ $ ls -l /tmp/gutenberg/ 2 total 3604 3 -rw-r--r-- 1 hduser hadoop 674566 Feb 3 10:17 pg20417.txt 4 -rw-r--r-- 1 hduser hadoop 1573112 Feb 3 10:18 pg4300.txt 5 -rw-r--r-- 1 hduser hadoop 1423801 Feb 3 10:18 pg5000.txt 6 hduser@ubuntu:~ $

    将本地的数据拷贝到HDFS上

    在运行MapReduce任务之前,我们必须先将本地文件拷贝到Hadoop的文件系统上

    01 hduser@ubuntu:/usr/local/hadoop $ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg 02 hduser@ubuntu:/usr/local/hadoop $ bin/hadoop dfs -ls 03 Found 1 items 04 drwxr-xr-x - hduser supergroup 0 2010-05-08 17:40 /user/hduser/gutenberg 05 hduser@ubuntu:/usr/local/hadoop $ bin/hadoop dfs -ls /user/hduser/gutenberg 06 Found 3 items 07 -rw-r--r-- 3 hduser supergroup 674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt 08 -rw-r--r-- 3 hduser supergroup 1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt 09 -rw-r--r-- 3 hduser supergroup 1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt 10 hduser@ubuntu:/usr/local/hadoop $

    运行MapReduce任务

    现在一切都准备好了,我们可以通过Hadoop的数据流API来传送Map和Reduce过程中间的数据。

    1 hduser@ubuntu:/usr/local/hadoop $ bin/hadoop jar contrib/streaming/hadoop-streaming.jar
    2 -file /home/hduser/mapper.py -mapper /home/hduser/mapper.py
    3 -file /home/hduser/reducer.py -reducer /home/hduser/reducer.py
    4 -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output

    你可以通过指定 -D参数来更改一些Hadoop 设置,比如增加reducer数量

    1 hduser@ubuntu:/usr/local/hadoop $ bin/hadoop jar contrib/streaming/hadoop-streaming.jar -D mapred.reduce.tasks =16 ...

    注意在命令行中可以接受mapred.reducetasks参数来指定reduce的个数,但是不能仅仅通过指定mapred.reduce.tasks来指定map tasks的个数。

    整个任务会从HDFS的路径/user/huser/gutenberg 上读取所有的文件,然后处理并输出到HDFS的路径/user/huser/gutnberg-output事实上Hadoop会给每个reducer创建一个输出文件,在我们的例子中只会输出一个文件因为输入的文件很小。上面命令行的输出示例如下:
    

    01 hduser@ubuntu:/usr/local/hadoop $ bin/hadoop jar contrib/streaming/hadoop-streaming.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output 02 additionalConfSpec_:null 03 null =@@@userJobConfProps_.get(stream.shipped.hadoopstreaming 04 packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/ ] 05 [] /tmp/streamjob54544.jar tmpDir =null 06 [... ] INFO mapred.FileInputFormat: Total input paths to process : 7 07 [... ] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local ] 08 [... ] INFO streaming.StreamJob: Running job: job_200803031615_0021 09 [... ] 10 [... ] INFO streaming.StreamJob: map 0% reduce 0% 11 [... ] INFO streaming.StreamJob: map 43% reduce 0% 12 [... ] INFO streaming.StreamJob: map 86% reduce 0% 13 [... ] INFO streaming.StreamJob: map 100% reduce 0% 14 [... ] INFO streaming.StreamJob: map 100% reduce 33% 15 [... ] INFO streaming.StreamJob: map 100% reduce 70% 16 [... ] INFO streaming.StreamJob: map 100% reduce 77% 17 [... ] INFO streaming.StreamJob: map 100% reduce 100% 18 [... ] INFO streaming.StreamJob: Job complete: job_200803031615_0021 19 [... ] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output 20 hduser@ubuntu:/usr/local/hadoop $

    从上面的输出可以看到,Hadoop还为一些统计信息提供了一个基本的网页接口。在Hadoop集群运行时可以打开 http://localhost:50030

    查看HDFS路径/user/hduser/gutenberg-output上的文件

    你可以通过 dfs -cat命令查看输出文件的内容

    01 hduser@ubuntu:/usr/local/hadoop $ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000 02 "(Lo)cra" 1 03 "1490 1 04 "1498, " 1 05 "35 " 1 06 "40, " 1 07 "A 2 08 "AS-IS". 2 09 "A_ 1 10 "Absoluti 1 11 [... ] 12 hduser@ubuntu:/usr/local/hadoop $ 注意上面截图中单词两边的双引号并不是Hadoop自己加上去的,而是python程序将单词分割后生成的,不信的话可以查看完整的输出文件。

    使用python的迭代器和生成器改进mapper和reducer代码 上面的例子应该让你明白了怎样构建一个MapReduce应用,不过上面那些代码侧重于易读性,特别是针对Python程序员新手。然而,在真实的应用中,你可能需要使用Python的迭代器和生成器来优化你的代码。

    一般来说,迭代器和生成器(产生迭代的函数,比如包含yield输出语句)的优点是只有在你需要使用一个序列的元素时它才会生成该元素。这对于计算量很大或者内存开销很大的任务来说是很有用的。

    注意,下面的Map和Reduce脚本只有在Hadoop的环境下才能正常运行,也就是说使用本地的命令“cat Data | ./mapper.py |sort -k1,1 |./reucer.py”并不会正常运行,因为有些函数特性在不能离开Hadoop

    mapper.py 01 #!/usr/bin/env python 02 """A more advanced Mapper, using Python iterators and generators.""" 03 04 import sys 05 06 def read_input( file ): 07 for line in file : 08 # split the line into words 09 yield line . split() 10 11 def main( separator = ' \t ' ): 12 # input comes from STDIN (standard input) 13 data = read_input( sys . stdin) 14 for words in data : 15 # write the results to STDOUT (standard output); 16 # what we output here will be the input for the 17 # Reduce step, i.e. the input for reducer.py 18 # 19 # tab-delimited; the trivial word count is 1 20 for word in words : 21 print ' %s%s%d ' % ( word , separator , 1) 22 23 if name == "main" : 24 main()

    reducer.py 01 #!/usr/bin/env python 02 """A more advanced Reducer, using Python iterators and generators.""" 03 04 from itertools import groupby 05 from operator import itemgetter 06 import sys 07 08 def read_mapper_output( file , separator = ' \t ' ): 09 for line in file : 10 yield line . rstrip() . split( separator , 1) 11 12 def main( separator = ' \t ' ): 13 # input comes from STDIN (standard input) 14 data = read_mapper_output( sys . stdin , separator = separator) 15 # groupby groups multiple word-count pairs by word, 16 # and creates an iterator that returns consecutive keys and their group: 17 # current_word - string containing a word (the key) 18 # group - iterator yielding all ["<current_word>", "<count>"] items 19 for current_word , group in groupby( data , itemgetter( 0 )): 20 try : 21 total_count = sum( int( count) for current_word , count in group) 22 print " %s%s%d " % ( current_word , separator , total_count) 23 except ValueError : 24 # count was not a number, so silently discard this item 25 pass 26 27 if name == "main" : 28 main()

    2021-02-22 13:27:10
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
《构建Hadoop生态批流一体的实时数仓》 立即下载
零基础实现hadoop 迁移 MaxCompute 之 数据 立即下载
CIO 指南:如何在SAP软件架构中使用Hadoop 立即下载