使用python构建基于hadoop的mapreduce日志分析平台

简介:

出处:http://rfyiamcool.blog.51cto.com/1030776/1340057

          


224356461.jpg





流量比较大的日志要是直接写入Hadoop对Namenode负载过大,所以入库前合并,可以把各个节点的日志凑并成一个文件写入HDFS。 根据情况定期合成,写入到hdfs里面。

咱们看看日志的大小,200G的dns日志文件,我压缩到了18G,要是用awk perl当然也可以,但是处理速度肯定没有分布式那样的给力。

230102727.jpg


Hadoop Streaming原理

mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。


任何语言,只要是方便接收标准输入输出就可以做mapreduce~

再搞之前我们先简单测试下shell模拟mapreduce的性能速度~

234955396.jpg


看下他的结果,350M的文件用时35秒左右。

235045406.jpg


这是2G的日志文件,居然用了3分钟。 当然和我写的脚本也有问题,我们是模拟mapreduce的方式,而不是调用shell下牛逼的awk,gawk处理。


001056805.jpg

awk的速度 !果然很霸道,处理日志的时候,我也很喜欢用awk,只是学习的难度有点大,不像别的shell组件那么灵活简单。

000946258.jpg


这是官方的提供的两个demo ~

map.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env python
"" "A more advanced Mapper, using Python iterators and generators." ""
import sys
def read_input(file):
for line  in file:
# split the line into words
yield line.split()
def main(separator= '\t' ):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words  in data:
# write the results to STDOUT (standard output);
# what we output here will be the input  for the
# Reduce step, i.e. the input  for reducer.py
#
# tab-delimited; the trivial word count  is 1
for word  in words:
print  '%s%s%d' % (word, separator,  1 )
if __name__ ==  "__main__" :
main()


reduce.py的修改方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/env python
"" "A more advanced Reducer, using Python iterators and generators." ""
from itertools  import groupby
from operator  import itemgetter
import sys
def read_mapper_output(file, separator= '\t' ):
for line  in file:
yield line.rstrip().split(separator,  1 )
def main(separator= '\t' ):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
#   current_word - string containing a word (the key)
#   group - iterator yielding all [ "<current_word>" "<count>" ] items
for current_word, group  in groupby(data, itemgetter( 0 )):
try :
total_count = sum( int (count)  for current_word, count  in group)
print  "%s%s%d" % (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard  this item
pass
if __name__ ==  "__main__" :
main()


咱们再简单点:

1
2
3
4
5
6
7
#!/usr/bin/env python
import sys
for line  in sys.stdin:
line = line.strip()
words = line.split()
for word  in words:
print  '%s\t%s' % (word,  1 )


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
from operator  import itemgetter
import sys
current_word = None
current_count =  0
word = None
for line  in sys.stdin:
line = line.strip()
word, count = line.split( '\t' 1 )
try :
count =  int (count)
except ValueError:
continue
if current_word == word:
current_count += count
else :
if current_word:
print  '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
if current_word == word:
print  '%s\t%s' % (current_word, current_count)


咱们就简单模拟下数据,跑个测试


084336884.jpg



剩下就没啥了,在hadoop集群环境下,运行hadoop的steaming.jar组件,加入mapreduce的脚本,指定输出就行了.  下面的例子我用的是shell的成分。


1
2
3
4
5
[root@ 101 cron]#$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc


详细的参数,对于咱们来说提供性能可以把tasks的任务数增加下,根据情况自己测试下,也别太高了,增加负担。

(1)-input:输入文件路径

(2)-output:输出文件路径

(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本

(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本

(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。

(6)-partitioner:用户自定义的partitioner程序

(7)-combiner:用户自定义的combiner程序(必须用java实现)

(8)-D:作业的一些属性(以前用的是-jonconf),具体有:
           1)mapred.map.tasks:map task数目
           2)mapred.reduce.tasks:reduce task数目
           3)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数
据的分隔符,默认均为\t。
            4)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
            5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。
            6)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目


这里是统计dns的日志文件有多少行 ~

114512821.jpg

在mapreduce作为参数的时候,不能用太多太复杂的shell语言,他不懂的~

可以写成shell文件的模式;

1
2
3
4
5
6
7
8
#! /bin/bash
while read LINE;  do
#   for word  in $LINE
#   do
#    echo  "$word 1"
awk  '{print $5}'
done
done


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#! /bin/bash
count= 0
started= 0
word= ""
while read LINE; do
goodk=`echo $LINE | cut -d  ' ' -f  1 `
if "x" == x "$goodk" ];then
continue
fi
if "$word" !=  "$goodk" ];then
[ $started -ne  0 ] && echo -e  "$word\t$count"
word=$goodk                                                                                                                  
count= 1
started= 1
else
count=$(( $count +  1 ))
fi
done


有时候会出现这样的问题,好好看看自己写的mapreduce程序 ~

13/12/14 13:26:52 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030

13/12/14 13:26:53 INFO streaming.StreamJob:  map 0%  reduce 0%

13/12/14 13:27:16 INFO streaming.StreamJob:  map 100%  reduce 100%

13/12/14 13:27:16 INFO streaming.StreamJob: To kill this job, run:

13/12/14 13:27:16 INFO streaming.StreamJob: /usr/local/hadoop/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201312131904_0030

13/12/14 13:27:16 INFO streaming.StreamJob: Tracking URL: http://101.rui.com:50030/jobdetails.jsp?jobid=job_201312131904_0030

13/12/14 13:27:16 ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201312131904_0030_m_000000

13/12/14 13:27:16 INFO streaming.StreamJob: killJob...

Streaming Command Failed!


python做为mapreduce执行成功后,结果和日志一般是放在你指定的目录下的,结果是在part-00000文件里面~


172550144.jpg

本文转自大角牛博客51CTO博客,原文链接http://blog.51cto.com/jingshengsun888/1341123如需转载请自行联系原作者


运维的戏子

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
2月前
|
数据可视化 关系型数据库 MySQL
基于python大数据的的海洋气象数据可视化平台
针对海洋气象数据量大、维度多的挑战,设计基于ECharts的可视化平台,结合Python、Django与MySQL,实现数据高效展示与交互分析,提升科研与决策效率。
|
4月前
|
API 开发工具 开发者
【干货满满】电商平台API接口用python调用脚本
这是一个支持淘宝、京东、拼多多、亚马逊等主流电商平台的通用 API 调用 Python 脚本框架,适配 doubao 使用。脚本封装了签名验证、请求处理、异常捕获及限流控制等核心功能,提供统一接口调用方式,便于开发者快速集成与扩展。
|
5月前
|
缓存 监控 API
1688平台开放接口实战:如何通过API获取店铺所有商品数据(Python示列)
本文介绍如何通过1688开放平台API接口获取店铺所有商品,涵盖准备工作、接口调用及Python代码实现,适用于商品同步与数据监控场景。
|
存储 运维 监控
超越传统模型:从零开始构建高效的日志分析平台——基于Elasticsearch的实战指南
【10月更文挑战第8天】随着互联网应用和微服务架构的普及,系统产生的日志数据量日益增长。有效地收集、存储、检索和分析这些日志对于监控系统健康状态、快速定位问题以及优化性能至关重要。Elasticsearch 作为一种分布式的搜索和分析引擎,以其强大的全文检索能力和实时数据分析能力成为日志处理的理想选择。
852 6
|
9月前
|
监控 Shell Linux
Android调试终极指南:ADB安装+多设备连接+ANR日志抓取全流程解析,覆盖环境变量配置/多设备调试/ANR日志分析全流程,附Win/Mac/Linux三平台解决方案
ADB(Android Debug Bridge)是安卓开发中的重要工具,用于连接电脑与安卓设备,实现文件传输、应用管理、日志抓取等功能。本文介绍了 ADB 的基本概念、安装配置及常用命令。包括:1) 基本命令如 `adb version` 和 `adb devices`;2) 权限操作如 `adb root` 和 `adb shell`;3) APK 操作如安装、卸载应用;4) 文件传输如 `adb push` 和 `adb pull`;5) 日志记录如 `adb logcat`;6) 系统信息获取如屏幕截图和录屏。通过这些功能,用户可高效调试和管理安卓设备。
|
存储 消息中间件 网络协议
日志平台-ELK实操系列(一)
日志平台-ELK实操系列(一)
|
9月前
|
机器学习/深度学习 数据采集 数据可视化
Python/Anaconda双方案加持!Jupyter Notebook全平台下载教程来袭
Jupyter Notebook 是一款交互式编程与数据科学分析工具,支持40多种编程语言,广泛应用于机器学习、数据清洗和学术研究。其核心优势包括实时执行代码片段、支持Markdown文档与LaTeX公式混排,并可导出HTML/PDF/幻灯片等格式。本文详细介绍了Jupyter Notebook的软件定位、特性、安装方案(Anaconda集成环境与原生Python+PIP安装)、首次运行配置及常见问题解决方案,帮助用户快速上手并高效使用该工具。
|
9月前
|
存储 监控 API
1688平台API接口实战:Python实现店铺全量商品数据抓取
本文介绍如何使用Python通过1688开放平台的API接口自动化抓取店铺所有商品数据。首先,开发者需在1688开放平台完成注册并获取App Key和App Secret,申请“商品信息查询”权限。接着,利用`alibaba.trade.product.search4trade`接口,构建请求参数、生成MD5签名,并通过分页机制获取全量商品数据。文中详细解析了响应结构、存储优化及常见问题处理方法,还提供了竞品监控、库存预警等应用场景示例和完整代码。
|
11月前
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
627 3
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
前端开发 数据可视化 API
Python实现智能家居设备的统一控制平台
【10月更文挑战第6天】 Python实现智能家居设备的统一控制平台
750 11

热门文章

最新文章

推荐镜像

更多