前言
大家好,我是明哥!
在前段时间的一篇博文中,笔者分析了 flink standalone 模式且不能使用 hdfs 场景下的各种问题及其应对方案,当时明确指出,在只能使用本地文件系统的情况下,flink job manager 是没有办法做到 HA 高可用的,因为没有一个分布式共享存储来提供多个job manager需要共享的状态信息,如已经提交的作业的JobGraph等信息。
michaelli:flink standalone 部署模式且不能使用 hdfs 场景下的各种问题及其应对方案4 赞同 · 3 评论文章
所以以上场景对监控与报警机制提出了较高的要求。基于此,引出本片博文,专门讨论监控与告警体系的重要性,并给出两个具体的实现脚本。
以下是正文。
监控与告警体系的重要性
绝大多数公司的生产环境,都有完善的监控与告警机制。
完善的监控体系,配合上合适的告警机制如邮件短信甚至电话告警,一方面能大大减轻公司的运维成本和运维人员的工作负担;另一方面,配合上一些分析和算法,还能提前做好预警防患于未然,使得大数据平台更加健康。
可以说,没有完善的监控与告警机制,就相当于在茫茫大海中裸泳,随时可能碰到各种异常,相关人员时时处于提心吊胆的状态。
常见监控项
笔者大概总结了下,主要的监控项分为以下几大类:
- 物理服务器层面的监控,如网络连通性磁盘IO磁盘容量内存使用率cpu使用率等等;
- 大数据集群服务的监控,如hdfs/yarn/hive/hbase 等服务的使用率和健康度等;
- 大数据作业的监控,如运行状态,失败次数,运行时长等。
而大数据流处理作业相对于大数据批处理作业,更强调时效性,更加重视作业的健康度和异常状态下的快速响应和快速回复,所以更加需要作业级别的完善的监控与告警机制。
笔者在此给出两个作业监控脚本的具体实现,一个是针对flink standalone模式下的流处理作业的监控,一个是针对 spark streaming on yarn模式下的流处理作业的监控,(其它模式可以参考这两个脚本略作修改即可)大家简单修改下对接上自己的报警系统,就可以使用了。
flink standalone 模式下的流处理作业监控脚本
该监控脚本的监控机制如下:
- 通过访问 flink jobmanager 提供的多个 restful url 来获得集群状态信息,并使用 jq 进行解析;
- 将以上解析获得的集群状态信息,与用户传入的期望的集群状态进行对比,从而实现监控,并触发必要的告警机制;
- 用户传入参数,包括 job manager 相关 restful url地址 (其实只要传入jobmanager的ip和port,即可拼接获得对应的多个restful url地址),期望的集群中task manager的个数,以及欲监控的 flink 作业的名称;
具体的监控脚本如下:
#!/usr/bin/env bash` #overviewUrl="http://10.20.39.42:8081/overview" #jobsOverviewUrl="http://10.20.39.42:8081/jobs/overview" #expectedNumOfTm=2 #jobName="Socket Window WordCount" overviewUrl=$1 jobsOverviewUrl=$2 expectedNumOfTm=$3 jobName=$4 #check flink job manager state and exit if not good overviewResult=`curl -s -X GET "$overviewUrl"` if [ -z "$overviewResult" ] then echo "flink job manager is not running properly, exit!" exit 1 else echo "flink job manager is in good health." fi #check flink task manager state and exit if not good numOfTm=`echo $overviewResult | jq .taskmanagers` if [ "$numOfTm" -ne "$expectedNumOfTm" ]; then echo "number of living task manager is not as expected, exit!" exit 2 else echo "flink task manager is in good state." fi #check flink job state and exit if not good jobsOverviewResult=`curl -s -X GET "$jobsOverviewUrl"` jobState=`echo $jobsOverviewResult | jq --arg jobName "$jobName" -r '.jobs[] | select(.name == $jobName and .state == "RUNNING")'` if [ -z "$jobState" ] then echo "there are no jobs with name $jobName in running state, exit!" exit 3 else echo "flink job $jobName is in running state." fi
spark streaming on yarn 模式下流处理作业监控脚本
该监控脚本的监控机制如下:
- 通过访问 yarn 提供的 restful url 来获得yarn 集群中处于 running 状态的 spark 作业列表;
- 使用 jq 解析以上处于 running 状态的 spark 作业列表信息,判断是否有用户指定名称的 spark streaming 作业处于 running 状态,如果没有的话,触发告警推出;如果有的话,解析获得该spark streming 作业的restful url;
- 进一步访问以上 spark streaming 作业的 restful url,获取作业执行细节包括 batch duration 和已执行完毕的 batches,然后 sleep 一段时间 (batch duration *2)后,再次访问以上 restful url 获取新的完成的 batches,对比两次获得的batches并进行比较,一次判断作业使用处于健康状态;
- 用户传入的参数,包括 yarn resource mnager地址,和欲监控的 spark streaming 作业的名称;
具体的监控脚本如下:
#!/bin/bash # specify input paramaters here, which are yarn resourcemanger address, and the spark streaming application name you want to monitor #params=(192.168.71.69:8088 spark-streaming-app-name) params=("$@") yarnRm=${params[0]} appName=${params[1]} #echo "the input yarn rm is;" $yarnRm #echo "the input appName to be monitored is: " $appName #get application id and tracking url from yarn resource manager yarnRestUrl="http://$yarnRm/ws/v1/cluster/apps?states=RUNNING&applicationTypes=spark" yarnAppDetails=`curl -s "$yarnRestUrl"` #echo "yarnRestUrl is: " $yarnRestUrl #echo "yarnAppDetails is: " $yarnAppDetails # if no spark application is in running state, exit with code 1 if [ -z "$yarnAppDetails" ] then #echo "no spark application is in running status, exit!" echo "2" exit 1 fi yarnAppId=`echo $yarnAppDetails | jq --arg appName "$appName" -r '.apps | select(.app[].name == $appName) | .app[0].id'` yarnAppTrackingUrl=`echo $yarnAppDetails | jq --arg appName "$appName" -r '.apps | select(.app[].name == $appName) | .app[0].trackingUrl'` #echo "yarnAppId is: " $yarnAppId #echo "yarn application tracking url is: " $yarnAppTrackingUrl #if no spark application with specified name is in running state, exit if [ -z "$yarnAppId" ] then #echo "no spark application with name $appName is in running status, exit!" echo "2" exit 1 fi #get application completed batches from spark sparkRestUrl="$yarnAppTrackingUrl/api/v1/applications/$yarnAppId/streaming/statistics" sparkAppDetails=`curl -s "$sparkRestUrl"` sparkBatchDuration=`echo $sparkAppDetails | jq .batchDuration` sparkCompletedBatchesPrev=`echo $sparkAppDetails | jq .numTotalCompletedBatches` sparkCheckInterval=$((sparkBatchDuration/1000*2)) #echo "spark restful url is: " $sparkRestUrl #echo $sparkAppDetails #echo "spark batch duration is: " $sparkBatchDuration #echo $sparkCompletedBatchesPrev #echo $sparkCheckInterval sleep $sparkCheckInterval sparkAppDetails=`curl -s "$sparkRestUrl"` sparkCompletedBatchesCurrent=`echo $sparkAppDetails | jq .numTotalCompletedBatches` if [ "$sparkCompletedBatchesCurrent" -le "$sparkCompletedBatchesPrev" ] then # spark streaming 程序在2个 batch duration 内1个 batch 都没有完成,处于不健康状态,退出! #echo "spark streaming application $appName is in bad health!!! sparkCompletedBatchesPrev is: $sparkCompletedBatchesPrev, sparkCompletedBatchesCurrent is $sparkCompletedBatchesCurrent" echo "1" exit 1 else #echo "spark streaming application $appName is in good health!!! sparkCompletedBatchesPrev is: $sparkCompletedBatchesPrev, sparkCompletedBatchesCurrent is $sparkCompletedBatchesCurrent" echo "0" fi
上述监控脚本的局限性
最后,有必要指出下,上述针对 spark/flink 流处理作业进行监控的脚本,有其局限性,只适用于少量的关键的作业的关键指标。一旦需要监控的作业比较多,需要监控的指标也比较多时,就会因为需要针对不同的作业及不同的指标写多个脚本造成监控脚本数量的膨胀,此时这些监控脚本会比较难管理,而且监控指标的查看也不是很直观。
所以当需要监控的作业量比较多时,当需要监控的指标比较多时,当需要比较直观的查看监控指标时,我们推荐用平台化的工具去做指标监控和告警系统。
flink/spark在框架层面都支持将运行时的各种指标 metrix sink 到 prometheus, c此时再配合上 grafana 监控大盘进行展示,配合上 alert manager 打通告警功能,一套完善的作业监控和告警系统就初具规模了。
也有读者提到了滴滴开源的夜莺企业级监控方案,笔者还没有尝试过,不过从大家的评价来看还是不错的,小伙伴们可以关注下。https://github.com/didi/nightin