用python写MapReduce函数——以WordCount为例

简介:

      尽管Hadoop框架是用java写的,但是Hadoop程序不限于java,可以用python、C++、ruby等。本例子中直接用python写一个MapReduce实例,而不是用Jython把python代码转化成jar文件。

      例子的目的是统计输入文件的单词的词频。

  • 输入:文本文件
  • 输出:文本(每行包括单词和单词的词频,两者之间用'\t'隔开)

 

1. Python MapReduce 代码

      使用python写MapReduce的“诀窍”是利用Hadoop流的API,通过STDIN(标准输入)、STDOUT(标准输出)在Map函数和Reduce函数之间传递数据。

      我们唯一需要做的是利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout。Hadoop流将会帮助我们处理别的任何事情。

1.1 Map阶段:mapper.py

在这里,我们假设把文件保存到hadoop-0.20.2/test/code/mapper.py

复制代码
#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)
复制代码

文件从STDIN读取文件。把单词切开,并把单词和词频输出STDOUT。Map脚本不会计算单词的总数,而是输出<word> 1。在我们的例子中,我们让随后的Reduce阶段做统计工作。

为了是脚本可执行,增加mapper.py的可执行权限

chmod +x hadoop-0.20.2/test/code/mapper.py

1.2 Reduce阶段:reducer.py

在这里,我们假设把文件保存到hadoop-0.20.2/test/code/reducer.py

复制代码
#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:  #count如果不是数字的话,直接忽略掉
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:  #不要忘记最后的输出
    print "%s\t%s" % (current_word, current_count)
复制代码

文件会读取mapper.py 的结果作为reducer.py 的输入,并统计每个单词出现的总的次数,把最终的结果输出到STDOUT。

为了是脚本可执行,增加reducer.py的可执行权限

chmod +x hadoop-0.20.2/test/code/reducer.py

细节:split(chara, m),第二个参数的作用,下面的例子很给力

复制代码
str = 'server=mpilgrim&ip=10.10.10.10&port=8080'
print str.split('=', 1)[0]  #1表示=只截一次
print str.split('=', 1)[1]
print str.split('=')[0]
print str.split('=')[1]
复制代码

输出

1
2
3
4
server
mpilgrim&ip = 10.10 . 10.10 &port = 8080
server
mpilgrim&ip 

1.3 测试代码(cat data | map | sort | reduce)

这里建议大家在提交给MapReduce job之前在本地测试mapper.py 和reducer.py脚本。否则jobs可能会成功执行,但是结果并非自己想要的。

功能性测试mapper.py 和 reducer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[rte@hadoop - 0.20 . 2 ]$cd test / code
[rte@code]$echo  "foo foo quux labs foo bar quux"  | . / mapper.py
foo  1
foo  1
quux     1
labs     1
foo  1
bar  1
quux     1
[rte@code]$echo  "foo foo quux labs foo bar quux"  | . / mapper.py | sort  - k1, 1  | . / reducer.py
bar  1
foo  3
labs     1
quux     2

 细节:sort -k1,1  参数何意?

-k, -key=POS1[,POS2]     键以pos1开始,以pos2结束

有时候经常使用sort来排序,需要预处理把需要排序的field语言在最前面。实际上这是

完全没有必要的,利用-k参数就足够了。

比如sort all

1
2
3
4
5
1  4
2  3
3  2
4  1
5  0

如果sort -k 2的话,那么执行结果就是

1
2
3
4
5
5  0
4  1
3  2
2  3
1  4

 

2. 在Hadoop上运行python代码

2.1 数据准备

下载以下三个文件的

我把上面三个文件放到hadoop-0.20.2/test/datas/目录下

2.2 运行

把本地的数据文件拷贝到分布式文件系统HDFS中。

bin/hadoop dfs -copyFromLocal /test/datas  hdfs_in

查看

bin/hadoop dfs -ls

结果

1
drwxr - xr - x    -  rte supergroup           0  2014 - 07 - 05  15 : 40  / user / rte / hdfs_in

查看具体的文件

bin/hadoop dfs -ls /user/rte/hdfs_in

执行MapReduce job

bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file test/code/mapper.py     -mapper test/code/mapper.py \
-file test/code/reducer.py    -reducer test/code/reducer.py \
-input /user/rte/hdfs_in/*    -output /user/rte/hdfs_out

实例输出

查看输出结果是否在目标目录/user/rte/hdfs_out

bin/hadoop dfs -ls /user/rte/hdfs_out

输出

1
2
3
Found  2  items
drwxr-xr-x   - rte supergroup           0  2014 -07 -05  20: 51  /user/rte/hdfs_out 2 /_logs
-rw-r--r--    2  rte supergroup      880829  2014 -07 -05  20: 51  /user/rte/hdfs_out 2 /part -00000

查看结果

bin/hadoop dfs -cat /user/rte/hdfs_out2/part-00000

输出

以上已经达成目的了,但是可以利用python迭代器和生成器优化

 

3. 利用python的迭代器和生成器优化Mapper 和 Reducer代码

3.1 python中的迭代器和生成器

   看这

3.2 优化Mapper 和 Reducer代码

mapper.py

复制代码
#!/usr/bin/env python
import sys
def read_input(file):
    for line in file:
        yield line.split()

def main(separator='\t'):
    data = read_input(sys.stdin)
    for words in data:
        for word in words:
            print "%s%s%d" % (word, separator, 1)

if __name__ == "__main__":
    main()
复制代码

reducer.py

复制代码
#!/usr/bin/env python
from operator import itemgetter
from itertools import groupby
import sys

def read_mapper_output(file, separator = '\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator = '\t'):
    data = read_mapper_output(sys.stdin, separator = separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except valueError:
            pass

if __name__ == "__main__":
    main()
复制代码

细节:groupby

复制代码
from itertools import groupby
from operator import itemgetter

things = [('2009-09-02', 11),
          ('2009-09-02', 3),
          ('2009-09-03', 10),
          ('2009-09-03', 4),
          ('2009-09-03', 22),
          ('2009-09-06', 33)]

sss = groupby(things, itemgetter(0))
for key, items in sss:
    print key
    for subitem in items:
        print subitem
    print '-' * 20
复制代码

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
>>>
2009 -09 -02
( '2009-09-02' 11 )
( '2009-09-02' 3 )
--------------------
2009 -09 -03
( '2009-09-03' 10 )
( '2009-09-03' 4 )
( '2009-09-03' 22 )
--------------------
2009 -09 -06
( '2009-09-06' 33 )
--------------------

注 

  • groupby(things, itemgetter(0)) 以第0列为排序目标
  • groupby(things, itemgetter(1))以第1列为排序目标
  • groupby(things)以整行为排序目标

4. 参考

python中的split函数中的参数问题

Writing an Hadoop MapReduce Program in Python

shell的sort命令的-k参数 

 





本文转自jihite博客园博客,原文链接:http://www.cnblogs.com/kaituorensheng/p/3826114.html,如需转载请自行联系原作者


相关文章
|
1月前
|
Python
【python从入门到精通】-- 第五战:函数大总结
【python从入门到精通】-- 第五战:函数大总结
68 0
|
1月前
|
Python
Python之函数详解
【10月更文挑战第12天】
Python之函数详解
|
1月前
|
存储 数据安全/隐私保护 索引
Python 散列类型三以及函数基础
【10月更文挑战第11天】
Python 散列类型三以及函数基础
|
25天前
|
测试技术 数据安全/隐私保护 Python
探索Python中的装饰器:简化和增强你的函数
【10月更文挑战第24天】在Python编程的海洋中,装饰器是那把可以令你的代码更简洁、更强大的魔法棒。它们不仅能够扩展函数的功能,还能保持代码的整洁性。本文将带你深入了解装饰器的概念、实现方式以及如何通过它们来提升你的代码质量。让我们一起揭开装饰器的神秘面纱,学习如何用它们来打造更加优雅和高效的代码。
|
27天前
|
弹性计算 安全 数据处理
Python高手秘籍:列表推导式与Lambda函数的高效应用
列表推导式和Lambda函数是Python中强大的工具。列表推导式允许在一行代码中生成新列表,而Lambda函数则是用于简单操作的匿名函数。通过示例展示了如何使用这些工具进行数据处理和功能实现,包括生成偶数平方、展平二维列表、按长度排序单词等。这些工具在Python编程中具有高度的灵活性和实用性。
|
30天前
|
Python
python的时间操作time-函数介绍
【10月更文挑战第19天】 python模块time的函数使用介绍和使用。
31 4
|
1月前
|
存储 Python
[oeasy]python038_ range函数_大小写字母的起止范围_start_stop
本文介绍了Python中`range`函数的使用方法及其在生成大小写字母序号范围时的应用。通过示例展示了如何利用`range`和`for`循环输出指定范围内的数字,重点讲解了小写和大写字母对应的ASCII码值范围,并解释了`range`函数的参数(start, stop)以及为何不包括stop值的原因。最后,文章留下了关于为何`range`不包含stop值的问题,留待下一次讨论。
22 1
|
1月前
|
索引 Python
Python中的其他内置函数有哪些
【10月更文挑战第12天】Python中的其他内置函数有哪些
16 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
98 3
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
50 1
下一篇
无影云桌面