没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现

前言

大家好,我是明哥!

在前段时间的一篇博文中,笔者分析了 flink standalone 模式且不能使用 hdfs 场景下的各种问题及其应对方案,当时明确指出,在只能使用本地文件系统的情况下,flink job manager 是没有办法做到 HA 高可用的,因为没有一个分布式共享存储来提供多个job manager需要共享的状态信息,如已经提交的作业的JobGraph等信息。

michaelli:flink standalone 部署模式且不能使用 hdfs 场景下的各种问题及其应对方案4 赞同 · 3 评论文章

所以以上场景对监控与报警机制提出了较高的要求。基于此,引出本片博文,专门讨论监控与告警体系的重要性,并给出两个具体的实现脚本。

以下是正文。

监控与告警体系的重要性

绝大多数公司的生产环境,都有完善的监控与告警机制。

完善的监控体系,配合上合适的告警机制如邮件短信甚至电话告警,一方面能大大减轻公司的运维成本和运维人员的工作负担;另一方面,配合上一些分析和算法,还能提前做好预警防患于未然,使得大数据平台更加健康。

可以说,没有完善的监控与告警机制,就相当于在茫茫大海中裸泳,随时可能碰到各种异常,相关人员时时处于提心吊胆的状态。

常见监控项

笔者大概总结了下,主要的监控项分为以下几大类:

  • 物理服务器层面的监控,如网络连通性磁盘IO磁盘容量内存使用率cpu使用率等等;
  • 大数据集群服务的监控,如hdfs/yarn/hive/hbase 等服务的使用率和健康度等;
  • 大数据作业的监控,如运行状态,失败次数,运行时长等。

而大数据流处理作业相对于大数据批处理作业,更强调时效性,更加重视作业的健康度和异常状态下的快速响应和快速回复,所以更加需要作业级别的完善的监控与告警机制。

笔者在此给出两个作业监控脚本的具体实现,一个是针对flink standalone模式下的流处理作业的监控,一个是针对 spark streaming on yarn模式下的流处理作业的监控,(其它模式可以参考这两个脚本略作修改即可)大家简单修改下对接上自己的报警系统,就可以使用了。

flink standalone 模式下的流处理作业监控脚本

该监控脚本的监控机制如下:

  • 通过访问 flink jobmanager 提供的多个 restful url 来获得集群状态信息,并使用 jq 进行解析;
  • 将以上解析获得的集群状态信息,与用户传入的期望的集群状态进行对比,从而实现监控,并触发必要的告警机制;
  • 用户传入参数,包括 job manager 相关 restful url地址 (其实只要传入jobmanager的ip和port,即可拼接获得对应的多个restful url地址),期望的集群中task manager的个数,以及欲监控的 flink 作业的名称;

具体的监控脚本如下:

#!/usr/bin/env bash`
#overviewUrl="http://10.20.39.42:8081/overview"
#jobsOverviewUrl="http://10.20.39.42:8081/jobs/overview"
#expectedNumOfTm=2
#jobName="Socket Window WordCount"
overviewUrl=$1
jobsOverviewUrl=$2
expectedNumOfTm=$3
jobName=$4
#check flink job manager state and exit if not good
overviewResult=`curl -s -X GET "$overviewUrl"`
if [ -z "$overviewResult" ]
then
 echo "flink job manager is not running properly, exit!"
 exit 1
else
 echo "flink job manager is in good health."
fi
#check flink task manager state and exit if not good
numOfTm=`echo $overviewResult | jq .taskmanagers`
if [ "$numOfTm" -ne "$expectedNumOfTm" ]; 
then 
 echo "number of living task manager is not as expected, exit!"
 exit 2
else 
 echo "flink task manager is in good state."
fi
#check flink job state and exit if not good
jobsOverviewResult=`curl -s -X GET "$jobsOverviewUrl"`
jobState=`echo $jobsOverviewResult | jq --arg jobName "$jobName" -r '.jobs[] | select(.name == $jobName and .state == "RUNNING")'`
if [ -z "$jobState" ]
then
 echo "there are no jobs with name $jobName in running state, exit!"
 exit 3
else
 echo "flink job $jobName is in running state."
fi

spark streaming on yarn 模式下流处理作业监控脚本

该监控脚本的监控机制如下:

  • 通过访问 yarn 提供的 restful url 来获得yarn 集群中处于 running 状态的 spark 作业列表;
  • 使用 jq 解析以上处于 running 状态的 spark 作业列表信息,判断是否有用户指定名称的 spark streaming 作业处于 running 状态,如果没有的话,触发告警推出;如果有的话,解析获得该spark streming 作业的restful url;
  • 进一步访问以上 spark streaming 作业的 restful url,获取作业执行细节包括 batch duration 和已执行完毕的 batches,然后 sleep 一段时间 (batch duration *2)后,再次访问以上 restful url 获取新的完成的 batches,对比两次获得的batches并进行比较,一次判断作业使用处于健康状态;
  • 用户传入的参数,包括 yarn resource mnager地址,和欲监控的 spark streaming 作业的名称;

具体的监控脚本如下:

#!/bin/bash
# specify input paramaters here, which are yarn resourcemanger address, and the spark streaming application name you want to monitor
#params=(192.168.71.69:8088 spark-streaming-app-name)
params=("$@")
yarnRm=${params[0]}
appName=${params[1]}
#echo "the input yarn rm is;" $yarnRm
#echo "the input appName to be monitored is: " $appName
#get application id and tracking url from yarn resource manager
yarnRestUrl="http://$yarnRm/ws/v1/cluster/apps?states=RUNNING&applicationTypes=spark"
yarnAppDetails=`curl -s "$yarnRestUrl"`
#echo "yarnRestUrl is: " $yarnRestUrl
#echo "yarnAppDetails is: " $yarnAppDetails
# if no spark application is in running state, exit with code 1
if [ -z "$yarnAppDetails" ]
then
 #echo "no spark application is in running status, exit!"
 echo "2"
 exit 1
fi
yarnAppId=`echo $yarnAppDetails | jq --arg appName "$appName" -r '.apps | select(.app[].name == $appName) | .app[0].id'`
yarnAppTrackingUrl=`echo $yarnAppDetails | jq --arg appName "$appName" -r '.apps | select(.app[].name == $appName) | .app[0].trackingUrl'`
#echo "yarnAppId is: " $yarnAppId
#echo "yarn application tracking url is: " $yarnAppTrackingUrl
#if no spark application with specified name is in running state, exit
if [ -z "$yarnAppId" ]
then
 #echo "no spark application with name $appName is in running status, exit!"
 echo "2"
 exit 1
fi
#get application completed batches from spark
sparkRestUrl="$yarnAppTrackingUrl/api/v1/applications/$yarnAppId/streaming/statistics"
sparkAppDetails=`curl -s "$sparkRestUrl"`
sparkBatchDuration=`echo $sparkAppDetails | jq .batchDuration`
sparkCompletedBatchesPrev=`echo $sparkAppDetails | jq .numTotalCompletedBatches`
sparkCheckInterval=$((sparkBatchDuration/1000*2))
#echo "spark restful url is: " $sparkRestUrl
#echo $sparkAppDetails
#echo "spark batch duration is: " $sparkBatchDuration
#echo $sparkCompletedBatchesPrev
#echo $sparkCheckInterval
 sleep $sparkCheckInterval
 sparkAppDetails=`curl -s "$sparkRestUrl"`
 sparkCompletedBatchesCurrent=`echo $sparkAppDetails | jq .numTotalCompletedBatches`
 if [ "$sparkCompletedBatchesCurrent" -le "$sparkCompletedBatchesPrev" ]
 then
     # spark streaming 程序在2个 batch duration 内1个 batch 都没有完成,处于不健康状态,退出!
     #echo "spark streaming application $appName is in bad health!!! sparkCompletedBatchesPrev is: $sparkCompletedBatchesPrev, sparkCompletedBatchesCurrent is $sparkCompletedBatchesCurrent"
     echo "1"
     exit 1 
 else
     #echo "spark streaming application $appName is in good health!!! sparkCompletedBatchesPrev is: $sparkCompletedBatchesPrev, sparkCompletedBatchesCurrent is $sparkCompletedBatchesCurrent"
     echo "0"
 fi


上述监控脚本的局限性

最后,有必要指出下,上述针对 spark/flink 流处理作业进行监控的脚本,有其局限性,只适用于少量的关键的作业的关键指标。一旦需要监控的作业比较多,需要监控的指标也比较多时,就会因为需要针对不同的作业及不同的指标写多个脚本造成监控脚本数量的膨胀,此时这些监控脚本会比较难管理,而且监控指标的查看也不是很直观。

所以当需要监控的作业量比较多时,当需要监控的指标比较多时,当需要比较直观的查看监控指标时,我们推荐用平台化的工具去做指标监控和告警系统。

flink/spark在框架层面都支持将运行时的各种指标 metrix sink 到 prometheus, c此时再配合上 grafana 监控大盘进行展示,配合上 alert manager 打通告警功能,一套完善的作业监控和告警系统就初具规模了。

也有读者提到了滴滴开源的夜莺企业级监控方案,笔者还没有尝试过,不过从大家的评价来看还是不错的,小伙伴们可以关注下。https://github.com/didi/nightin

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
283 1
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
130 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
69 1
|
4月前
|
SQL 分布式计算 监控
|
4月前
|
分布式计算 并行计算 数据处理
|
4月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
57 0
|
4月前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
84 0
|
4月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
299 0
|
4月前
|
消息中间件 大数据 Kafka
Apache Flink 大揭秘:征服大数据实时流处理的神奇魔法,等你来解锁!
【8月更文挑战第5天】Apache Flink 是一款强大的开源大数据处理框架,专长于实时流处理。本教程通过两个示例引导你入门:一是计算数据流中元素的平均值;二是从 Kafka 中读取数据并实时处理。首先确保已安装配置好 Flink 和 Kafka 环境。第一个 Java 示例展示了如何创建流执行环境,生成数据流,利用 `flatMap` 转换数据,并使用 `keyBy` 和 `sum` 计算平均值。第二个示例则演示了如何设置 Kafka 消费者属性,并从 Kafka 主题读取数据。这两个示例为你提供了使用 Flink 进行实时流处理的基础。随着进一步学习,你将能应对更复杂的实时数据挑战。
87 0
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
下一篇
DataWorks