开发者社区> 技术小胖子> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

python调用mrjob实现hadoop的mapreduce日志解析

简介:
+关注继续查看

咱们一般写mapreduce是通过java和streaming来写的,身为pythoner的我,

java不会,没办法就用streaming来写mapreduce日志分析。 这里要介绍一个

模块,是基于streaming搞的东西。


mrjob 可以让用 Python 来编写 MapReduce 运算,并在多个不同平台上运行,你可以:

  • 使用纯 Python 编写多步的 MapReduce 作业

  • 在本机上进行测试

  • 在 Hadoop 集群上运行


pip 的安装方法:

1
pip install mrjob


我测试的脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#coding:utf-8
from mrjob.job import MRJob
import re
#xiaorui.cc
#WORD_RE = re.compile(r"[\w']+")
WORD_RE = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")
class MRWordFreqCount(MRJob):
    def mapper(self, word, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1
    def combiner(self, word, counts):
        yield word, sum(counts)
    def reducer(self, word, counts):
        yield word, sum(counts)
if __name__ == '__main__':
    MRWordFreqCount.run()


用法算简单:

python i.py -r inline input1 input2 input3 > out 命令可以将处理多个文件的结果输出到out文件里面。

本地模拟hadoop运行:python 1.py -r local <input> output

这个会把结果输出到output里面,这个output必须写。

hadoop集群上运行:python 1.py -r hadoop <input> output


执行脚本 ~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@kspc ~]# python mo.py -r local  <10.7.17.7-dnsquery.log.1> output
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/mo.root.20131224.040935.241241
reading from STDIN
writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000
> /usr/bin/python mo.py --step-num=0 --mapper /tmp/mo.root.20131224.040935.241241/input_part-00000 | sort | /usr/bin/python mo.py --step-num=0 --combiner > /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000
writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
> /usr/bin/python mo.py --step-num=0 --mapper /tmp/mo.root.20131224.040935.241241/input_part-00001 | sort | /usr/bin/python mo.py --step-num=0 --combiner > /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
Counters from step 1:
  (no counters found)
writing to /tmp/mo.root.20131224.040935.241241/step-0-mapper-sorted
> sort /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00000 /tmp/mo.root.20131224.040935.241241/step-0-mapper_part-00001
writing to /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000
> /usr/bin/python mo.py --step-num=0 --reducer /tmp/mo.root.20131224.040935.241241/input_part-00000 > /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000
writing to /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001
> /usr/bin/python mo.py --step-num=0 --reducer /tmp/mo.root.20131224.040935.241241/input_part-00001 > /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001
Counters from step 1:
  (no counters found)
Moving /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00000 -> /tmp/mo.root.20131224.040935.241241/output/part-00000
Moving /tmp/mo.root.20131224.040935.241241/step-0-reducer_part-00001 -> /tmp/mo.root.20131224.040935.241241/output/part-00001
Streaming final output from /tmp/mo.root.20131224.040935.241241/output
removing tmp directory /tmp/mo.root.20131224.040935.241241

执行的时候,资源的占用情况。

133630767.jpg


发现一个很奇妙的东西,mrjob居然调用shell下的sort来排序。。。。

133937941.jpg


为了更好的理解mrjob的用法,再来个例子。


1
2
3
4
5
6
7
8
9
10
11
12
13
from mrjob.job import MRJob
#from xiaorui.cc
class MRWordFrequencyCount(MRJob):
#把东西拼凑起来
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines"1
#总结kv
    def reducer(self, key, values):
        yield key, sum(values)
if __name__ == '__main__':
    MRWordFrequencyCount.run()


看下结果:

135509171.jpg

下面是官网给的一些个用法:


我们可以看到他是支持hdfs和s3存储的 !

Running your job different ways

The most basic way to run your job is on the command line:

$ python my_job.py input.txt

By default, output will be written to stdout.

You can pass input via stdin, but be aware that mrjob will just dump it to a file first:

$ python my_job.py < input.txt

You can pass multiple input files, mixed with stdin (using the - character):

$ python my_job.py input1.txt input2.txt - < input3.txt

By default, mrjob will run your job in a single Python process. This provides the friendliest debugging experience, but it’s not exactly distributed computing!

You change the way the job is run with the -r/--runner option. You can use -rinline (the default), -rlocal-rhadoop, or -remr.

To run your job in multiple subprocesses with a few Hadoop features simulated, use -rlocal.

To run it on your Hadoop cluster, use -rhadoop.

If you have Elastic MapReduce configured (see Elastic MapReduce Quickstart), you can run it there with -remr.

Your input files can come from HDFS if you’re using Hadoop, or S3 if you’re using EMR:

$ python my_job.py -r emr s3://my-inputs/input.txt
$ python my_job.py -r hadoop hdfs://my_home/input.txt


 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1344368,如需转载请自行联系原作者



版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Python模块——glob模块详解
Python模块——glob模块详解
47 0
每周一个 Python 模块 | os.path
本文基于 Python3 编写测试。 os.path 模块是跨平台的,即使不打算在平台之间移植自己的程序也应该用 os.path,好处多多。
26 0
Python 模块实战| 学习笔记
快速学习 Python 模块实战
43 0
Python编程:glob模块进行文件名模式匹配
Python编程:glob模块进行文件名模式匹配
46 0
Python中fileinput模块使用
fileinput模块可以对一个或多个文件中的内容进行迭代、遍历等操作。该模块的input()函数有点类似文件 readlines()方法,区别在于前者是一个迭代对象,需要用for循环迭代,后者是一次性读取所有行。
753 0
21114
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载