python调用mrjob实现hadoop的mapreduce日志解析

简介:

咱们一般写mapreduce是通过java和streaming来写的,身为pythoner的我,

java不会,没办法就用streaming来写mapreduce日志分析。 这里要介绍一个

模块,是基于streaming搞的东西。


mrjob 可以让用 Python 来编写 MapReduce 运算,并在多个不同平台上运行,你可以:

  • 使用纯 Python 编写多步的 MapReduce 作业

  • 在本机上进行测试

  • 在 Hadoop 集群上运行


pip 的安装方法:

1
pip install mrjob


我测试的脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#coding:utf- 8
from mrjob.job  import  MRJob
import  re
#xiaorui.cc
#WORD_RE = re.compile(r "[\w']+" )
WORD_RE = re.compile(r "\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}" )
class  MRWordFreqCount(MRJob):
     def mapper(self, word, line):
         for  word  in  WORD_RE.findall(line):
             yield word.lower(),  1
     def combiner(self, word, counts):
         yield word, sum(counts)
     def reducer(self, word, counts):
         yield word, sum(counts)
if  __name__ ==  '__main__' :
     MRWordFreqCount.run()


用法算简单:

python i.py -r inline input1 input2 input3 > out 命令可以将处理多个文件的结果输出到out文件里面。

本地模拟hadoop运行:python 1.py -r local <input> output

这个会把结果输出到output里面,这个output必须写。

hadoop集群上运行:python 1.py -r hadoop <input> output


执行脚本 ~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@kspc ~]# python mo.py -r local  < 10.7 . 17.7 -dnsquery.log. 1 > output
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/mo.root. 20131224.040935 . 241241
reading from STDIN
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00000
> /usr/bin/python mo.py --step-num= 0  --mapper /tmp/mo.root. 20131224.040935 . 241241 /input_part- 00000  | sort | /usr/bin/python mo.py --step-num= 0  --combiner > /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00000
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00001
> /usr/bin/python mo.py --step-num= 0  --mapper /tmp/mo.root. 20131224.040935 . 241241 /input_part- 00001  | sort | /usr/bin/python mo.py --step-num= 0  --combiner > /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00001
Counters from step  1 :
   (no counters found)
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper-sorted
> sort /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00000  /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -mapper_part- 00001
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00000
> /usr/bin/python mo.py --step-num= 0  --reducer /tmp/mo.root. 20131224.040935 . 241241 /input_part- 00000  > /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00000
writing to /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00001
> /usr/bin/python mo.py --step-num= 0  --reducer /tmp/mo.root. 20131224.040935 . 241241 /input_part- 00001  > /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00001
Counters from step  1 :
   (no counters found)
Moving /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00000  -> /tmp/mo.root. 20131224.040935 . 241241 /output/part- 00000
Moving /tmp/mo.root. 20131224.040935 . 241241 /step- 0 -reducer_part- 00001  -> /tmp/mo.root. 20131224.040935 . 241241 /output/part- 00001
Streaming  final  output from /tmp/mo.root. 20131224.040935 . 241241 /output
removing tmp directory /tmp/mo.root. 20131224.040935 . 241241

执行的时候,资源的占用情况。

133630767.jpg


发现一个很奇妙的东西,mrjob居然调用shell下的sort来排序。。。。

133937941.jpg


为了更好的理解mrjob的用法,再来个例子。


1
2
3
4
5
6
7
8
9
10
11
12
13
from mrjob.job  import  MRJob
#from xiaorui.cc
class  MRWordFrequencyCount(MRJob):
#把东西拼凑起来
     def mapper(self, _, line):
         yield  "chars" , len(line)
         yield  "words" , len(line.split())
         yield  "lines" 1
#总结kv
     def reducer(self, key, values):
         yield key, sum(values)
if  __name__ ==  '__main__' :
     MRWordFrequencyCount.run()


看下结果:

135509171.jpg

下面是官网给的一些个用法:


我们可以看到他是支持hdfs和s3存储的 !

Running your job different ways

The most basic way to run your job is on the command line:

$ python my_job.py input.txt

By default, output will be written to stdout.

You can pass input via stdin, but be aware that mrjob will just dump it to a file first:

$ python my_job.py < input.txt

You can pass multiple input files, mixed with stdin (using the - character):

$ python my_job.py input1.txt input2.txt - < input3.txt

By default, mrjob will run your job in a single Python process. This provides the friendliest debugging experience, but it’s not exactly distributed computing!

You change the way the job is run with the -r/--runner option. You can use -rinline (the default), -rlocal-rhadoop, or -remr.

To run your job in multiple subprocesses with a few Hadoop features simulated, use -rlocal.

To run it on your Hadoop cluster, use -rhadoop.

If you have Elastic MapReduce configured (see Elastic MapReduce Quickstart), you can run it there with -remr.

Your input files can come from HDFS if you’re using Hadoop, or S3 if you’re using EMR:

$ python my_job.py -r emr s3://my-inputs/input.txt
$ python my_job.py -r hadoop hdfs://my_home/input.txt


 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1344368 ,如需转载请自行联系原作者



相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
23天前
|
存储 缓存 算法
Python中collections模块的deque双端队列:深入解析与应用
在Python的`collections`模块中,`deque`(双端队列)是一个线程安全、快速添加和删除元素的双端队列数据类型。它支持从队列的两端添加和弹出元素,提供了比列表更高的效率,特别是在处理大型数据集时。本文将详细解析`deque`的原理、使用方法以及它在各种场景中的应用。
|
4天前
|
数据采集 机器学习/深度学习 数据挖掘
Python数据清洗与预处理面试题解析
【4月更文挑战第17天】本文介绍了Python数据清洗与预处理在面试中的常见问题,包括Pandas基础操作、异常值处理和特征工程。通过示例代码展示了数据读取、筛选、合并、分组统计、离群点检测、缺失值和重复值处理、特征缩放、编码、转换和降维。强调了易错点,如忽视数据质量检查、盲目处理数据、数据隐私保护、过度简化特征关系和忽视模型输入要求。掌握这些技能和策略将有助于在面试中脱颖而出。
23 8
|
6天前
|
调度 Python
Python多线程、多进程与协程面试题解析
【4月更文挑战第14天】Python并发编程涉及多线程、多进程和协程。面试中,对这些概念的理解和应用是评估候选人的重要标准。本文介绍了它们的基础知识、常见问题和应对策略。多线程在同一进程中并发执行,多进程通过进程间通信实现并发,协程则使用`asyncio`进行轻量级线程控制。面试常遇到的问题包括并发并行混淆、GIL影响多线程性能、进程间通信不当和协程异步IO理解不清。要掌握并发模型,需明确其适用场景,理解GIL、进程间通信和协程调度机制。
25 0
|
6天前
|
API Python
Python模块化编程:面试题深度解析
【4月更文挑战第14天】了解Python模块化编程对于构建大型项目至关重要,它涉及代码组织、复用和维护。本文深入探讨了模块、包、导入机制、命名空间和作用域等基础概念,并列举了面试中常见的模块导入混乱、不适当星号导入等问题,强调了避免循环依赖、合理使用`__init__.py`以及理解模块作用域的重要性。掌握这些知识将有助于在面试中自信应对模块化编程的相关挑战。
19 0
|
10天前
|
SQL API 数据库
Python中的SQLAlchemy框架:深度解析与实战应用
【4月更文挑战第13天】在Python的众多ORM(对象关系映射)框架中,SQLAlchemy以其功能强大、灵活性和易扩展性脱颖而出,成为许多开发者首选的数据库操作工具。本文将深入探讨SQLAlchemy的核心概念、功能特点以及实战应用,帮助读者更好地理解和使用这一框架。
|
11天前
|
存储 JSON JavaScript
「Python系列」Python JSON数据解析
在Python中解析JSON数据通常使用`json`模块。`json`模块提供了将JSON格式的数据转换为Python对象(如列表、字典等)以及将Python对象转换为JSON格式的数据的方法。
28 0
|
12天前
|
分布式计算 资源调度 监控
Hadoop生态系统深度剖析:面试经验与必备知识点解析
本文深入探讨了Hadoop生态系统的面试重点,涵盖Hadoop架构、HDFS、YARN和MapReduce。了解Hadoop的主从架构、HDFS的读写流程及高级特性,YARN的资源管理与调度,以及MapReduce编程模型。通过代码示例,如HDFS文件操作和WordCount程序,帮助读者巩固理解。此外,文章强调在面试中应结合个人经验、行业动态和技术进展展示技术实力。
|
8月前
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
8月前
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
8月前
|
分布式计算 Hadoop 数据处理
Hadoop基础学习---6、MapReduce框架原理(二)
Hadoop基础学习---6、MapReduce框架原理(二)

热门文章

最新文章

推荐镜像

更多