咱们一般写mapreduce是通过java和streaming来写的,身为pythoner的我,
java不会,没办法就用streaming来写mapreduce日志分析。 这里要介绍一个
模块,是基于streaming搞的东西。
mrjob 可以让用 Python 来编写 MapReduce 运算,并在多个不同平台上运行,你可以:
pip 的安装方法:
我测试的脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#coding:utf- 8
from mrjob.job import MRJob
import re
#xiaorui.cc
#WORD_RE = re.compile(r "[\w']+" )
WORD_RE = re.compile(r "\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}" )
class MRWordFreqCount(MRJob):
def mapper(self, word, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
def combiner(self, word, counts):
yield word, sum(counts)
def reducer(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__' :
MRWordFreqCount.run()
|
用法算简单:
python i.py -r inline input1 input2 input3 > out 命令可以将处理多个文件的结果输出到out文件里面。
本地模拟hadoop运行:python 1.py -r local <input> output
这个会把结果输出到output里面,这个output必须写。
hadoop集群上运行:python 1.py -r hadoop <input> output
执行脚本 ~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
[root@kspc ~]# python mo.py -r local < 10.7 . 17.7 -dnsquery.log. 1 > output
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/mo.root. 20131224.040935 . 241241
reading from STDIN
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00000
> /usr/bin/python mo.py --step-num= 0 --mapper /tmp/mo.root. 20131224.040935 . 241241 /input_part- 00000 | sort | /usr/bin/python mo.py --step-num= 0 --combiner > /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00000
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00001
> /usr/bin/python mo.py --step-num= 0 --mapper /tmp/mo.root. 20131224.040935 . 241241 /input_part- 00001 | sort | /usr/bin/python mo.py --step-num= 0 --combiner > /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00001
Counters from step 1 :
(no counters found)
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper-sorted
> sort /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00000 /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00001
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00000
> /usr/bin/python mo.py --step-num= 0 --reducer /tmp/mo.root. 20131224.040935 . 241241 /input_part- 00000 > /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00000
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00001
> /usr/bin/python mo.py --step-num= 0 --reducer /tmp/mo.root. 20131224.040935 . 241241 /input_part- 00001 > /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00001
Counters from step 1 :
(no counters found)
Moving /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00000 -> /tmp/mo.root. 20131224.040935 . 241241 /output/part- 00000
Moving /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00001 -> /tmp/mo.root. 20131224.040935 . 241241 /output/part- 00001
Streaming final output from /tmp/mo.root. 20131224.040935 . 241241 /output
removing tmp directory /tmp/mo.root. 20131224.040935 . 241241
|
执行的时候,资源的占用情况。

发现一个很奇妙的东西,mrjob居然调用shell下的sort来排序。。。。

为了更好的理解mrjob的用法,再来个例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from mrjob.job import MRJob
#from xiaorui.cc
class MRWordFrequencyCount(MRJob):
#把东西拼凑起来
def mapper(self, _, line):
yield "chars" , len(line)
yield "words" , len(line.split())
yield "lines" , 1
#总结kv
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__' :
MRWordFrequencyCount.run()
|
看下结果:

下面是官网给的一些个用法:
我们可以看到他是支持hdfs和s3存储的 !
Running your job different ways
The most basic way to run your job is on the command line:
$ python my_job.py input.txt
By default, output will be written to stdout.
You can pass input via stdin, but be aware that mrjob will just dump it to a file first:
$ python my_job.py < input.txt
You can pass multiple input files, mixed with stdin (using the - character):
$ python my_job.py input1.txt input2.txt - < input3.txt
By default, mrjob will run your job in a single Python process. This provides the friendliest debugging experience, but it’s not exactly distributed computing!
You change the way the job is run with the -r/--runner option. You can use -rinline (the default), -rlocal, -rhadoop, or -remr.
To run your job in multiple subprocesses with a few Hadoop features simulated, use -rlocal.
To run it on your Hadoop cluster, use -rhadoop.
If you have Elastic MapReduce configured (see Elastic MapReduce Quickstart), you can run it there with -remr.
Your input files can come from HDFS if you’re using Hadoop, or S3 if you’re using EMR:
$ python my_job.py -r emr s3://my-inputs/input.txt
$ python my_job.py -r hadoop hdfs://my_home/input.txt
本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1344368,如需转载请自行联系原作者