《深入理解Spark:核心思想与源码分析》——3.9节启动测量系统MetricsSystem

简介:

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.9节启动测量系统MetricsSystem,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.9 启动测量系统MetricsSystem
MetricsSystem使用codahale提供的第三方测量仓库Metrics,有关Metrics的具体信息可以参考附录D。MetricsSystem中有三个概念:
Instance:指定了谁在使用测量系统;
Source:指定了从哪里收集测量数据;
Sink:指定了往哪里输出测量数据。
Spark按照Instance的不同,区分为Master、Worker、Application、Driver和Executor。
Spark目前提供的Sink有ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等。
Spark中使用MetricsServlet作为默认的Sink。
MetricsSystem的启动代码如下。
val metricsSystem = env.metricsSystem

metricsSystem.start()

MetricsSystem的启动过程包括以下步骤:
1)注册Sources;
2)注册Sinks;
3)给Sinks增加Jetty的ServletContextHandler。
MetricsSystem启动完毕后,会遍历与Sinks有关的ServletContextHandler,并调用attach-Handler将它们绑定到Spark UI上。
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler (handler)))

3.9.1 注册Sources
registerSources方法用于注册Sources,告诉测量系统从哪里收集测量数据,它的实现见代码清单3-45。注册Sources的过程分为以下步骤:
1)从metricsConfig获取Driver的Properties,默认为创建MetricsSystem的过程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。
2)用正则匹配Driver的Properties中以source.开头的属性。然后将属性中的Source反射得到的实例加入ArrayBuffer[Source]。
3)将每个source的metricRegistry(也是MetricSet的子类型)注册到Concurrent-Map metrics。这里的registerSource方法已在3.8.2节讲解过。
代码清单3-45 MetricsSystem注册Sources的实现

private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
        val classPath = kv._2.getProperty("class")
        try {
            val source = Class.forName(classPath).newInstance()
            registerSource(source.asInstanceOf[Source])
        } catch {
            case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
        }
    }
}

3.9.2 注册Sinks
registerSinks方法用于注册Sinks,即告诉测量系统MetricsSystem往哪里输出测量数据,它的实现见代码清单3-46。注册Sinks的步骤如下:
1)从Driver的Properties中用正则匹配以sink.开头的属性,如{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json},将其转换为Map(servlet -> {class=org.apache.spark.metrics.sink.MetricsServlet, path=/metrics/json})。
2)将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。
代码清单3-46 MetricsSystem注册Sinks的实现

private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
    sinkConfigs.foreach { kv =>
        val classPath = kv._2.getProperty("class")
        if (null != classPath) {
            try {
                val sink = Class.forName(classPath)
                .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
                .newInstance(kv._2, registry, securityMgr)
            if (kv._1 == "servlet") {
                metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
            } else {
                sinks += sink.asInstanceOf[Sink]
            }
            } catch {
                case e: Exception => logError("Sink class "+ classPath + " cannot be instantialized",e)
            }
        }
    }
}

3.9.3 给Sinks增加Jetty的ServletContextHandler
为了能够在SparkUI(网页)访问到测量数据,所以需要给Sinks增加Jetty的Servlet-ContextHandler,这里主要用到MetricsSystem的getServletHandlers方法实现如下。

def getServletHandlers = {
    require(running, "Can only call getServletHandlers on a running MetricsSystem")
    metricsServlet.map(_.getHandlers).getOrElse(Array())
}
可以看到调用了metricsServlet的getHandlers,其实现如下。
def getHandlers = Array[ServletContextHandler](
    createServletHandler(servletPath,
        new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)
)
最终生成处理/metrics/json请求的ServletContextHandler,而请求的真正处理由get-MetricsSnapshot方法,利用fastjson解析。生成的ServletContextHandler通过SparkUI的attachHandler方法,也被绑定到SparkUI(creatServlethandler与attachHandler方法在3.4.4节详细讲述过)。最终我们可以使用以下这些地址来访问测量数据。
http://localhost:4040/metrics/applications/json。
http://localhost:4040/metrics/json。
http://localhost:4040/metrics/master/json。
相关文章
|
SQL 分布式计算 Java
Apache IoTDB开发系统整合之Spark IoTDB Connecter
以下 TsFile 结构为例: TsFile 架构中有三个度量:状态、温度和硬件。
286 0
|
5月前
|
存储 分布式计算 资源调度
Hadoop生态系统概览:从HDFS到Spark
【8月更文第28天】Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它由多个组件构成,旨在提供高可靠性、高可扩展性和成本效益的数据处理解决方案。本文将介绍Hadoop的核心组件,包括HDFS、MapReduce、YARN,并探讨它们如何与现代大数据处理工具如Spark集成。
397 0
|
5月前
|
分布式计算 Java Linux
【Deepin 20系统】Linux 系统安装Spark教程及使用
在Deepin 20系统上安装和使用Apache Spark的详细教程,包括安装Java JDK、下载和解压Spark安装包、配置环境变量和Spark配置文件、启动和关闭Spark集群的步骤,以及使用Spark Shell和PySpark进行简单操作的示例。
94 0
|
7月前
|
机器学习/深度学习 分布式计算 算法
基于Spark中随机森林模型的天气预测系统
基于Spark中随机森林模型的天气预测系统
176 1
|
8月前
|
存储 Java 关系型数据库
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
242 1
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
|
8月前
|
分布式计算 资源调度 大数据
【大数据技术Hadoop+Spark】Spark架构、原理、优势、生态系统等讲解(图文解释)
【大数据技术Hadoop+Spark】Spark架构、原理、优势、生态系统等讲解(图文解释)
1566 1
|
8月前
|
SQL 分布式计算 数据处理
Spark的生态系统概览:Spark SQL、Spark Streaming
Spark的生态系统概览:Spark SQL、Spark Streaming
|
存储 分布式计算 Hadoop
Apache IoTDB开发系统整合之TsFile-Spark-Connector
TsFile-Spark-Connector 可以在 SparkSQL By SparkSQL 中将一个或多个 TsFiles 显示为表。它还允许用户指定单个目录或使用通配符来匹配多个目录。如果有多个 TsFiles,则所有 TsFiles 中测量值的并集将保留在表中,并且默认情况下,具有相同名称的度量将具有相同的数据类型。
141 1
|
机器学习/深度学习 分布式计算 搜索推荐
基于Spark的电影推荐系统实现
基于Spark的电影推荐系统实现
|
机器学习/深度学习 存储 分布式计算
Hadoop生态系统中的机器学习与数据挖掘技术:Apache Mahout和Apache Spark MLlib的应用
Hadoop生态系统中的机器学习与数据挖掘技术:Apache Mahout和Apache Spark MLlib的应用