《深入理解Spark:核心思想与源码分析》——3.9节启动测量系统MetricsSystem

简介:

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.9节启动测量系统MetricsSystem,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.9 启动测量系统MetricsSystem
MetricsSystem使用codahale提供的第三方测量仓库Metrics,有关Metrics的具体信息可以参考附录D。MetricsSystem中有三个概念:
Instance:指定了谁在使用测量系统;
Source:指定了从哪里收集测量数据;
Sink:指定了往哪里输出测量数据。
Spark按照Instance的不同,区分为Master、Worker、Application、Driver和Executor。
Spark目前提供的Sink有ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等。
Spark中使用MetricsServlet作为默认的Sink。
MetricsSystem的启动代码如下。
val metricsSystem = env.metricsSystem

metricsSystem.start()

MetricsSystem的启动过程包括以下步骤:
1)注册Sources;
2)注册Sinks;
3)给Sinks增加Jetty的ServletContextHandler。
MetricsSystem启动完毕后,会遍历与Sinks有关的ServletContextHandler,并调用attach-Handler将它们绑定到Spark UI上。
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler (handler)))

3.9.1 注册Sources
registerSources方法用于注册Sources,告诉测量系统从哪里收集测量数据,它的实现见代码清单3-45。注册Sources的过程分为以下步骤:
1)从metricsConfig获取Driver的Properties,默认为创建MetricsSystem的过程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。
2)用正则匹配Driver的Properties中以source.开头的属性。然后将属性中的Source反射得到的实例加入ArrayBuffer[Source]。
3)将每个source的metricRegistry(也是MetricSet的子类型)注册到Concurrent-Map metrics。这里的registerSource方法已在3.8.2节讲解过。
代码清单3-45 MetricsSystem注册Sources的实现

private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
        val classPath = kv._2.getProperty("class")
        try {
            val source = Class.forName(classPath).newInstance()
            registerSource(source.asInstanceOf[Source])
        } catch {
            case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
        }
    }
}

3.9.2 注册Sinks
registerSinks方法用于注册Sinks,即告诉测量系统MetricsSystem往哪里输出测量数据,它的实现见代码清单3-46。注册Sinks的步骤如下:
1)从Driver的Properties中用正则匹配以sink.开头的属性,如{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json},将其转换为Map(servlet -> {class=org.apache.spark.metrics.sink.MetricsServlet, path=/metrics/json})。
2)将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。
代码清单3-46 MetricsSystem注册Sinks的实现

private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
    sinkConfigs.foreach { kv =>
        val classPath = kv._2.getProperty("class")
        if (null != classPath) {
            try {
                val sink = Class.forName(classPath)
                .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
                .newInstance(kv._2, registry, securityMgr)
            if (kv._1 == "servlet") {
                metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
            } else {
                sinks += sink.asInstanceOf[Sink]
            }
            } catch {
                case e: Exception => logError("Sink class "+ classPath + " cannot be instantialized",e)
            }
        }
    }
}

3.9.3 给Sinks增加Jetty的ServletContextHandler
为了能够在SparkUI(网页)访问到测量数据,所以需要给Sinks增加Jetty的Servlet-ContextHandler,这里主要用到MetricsSystem的getServletHandlers方法实现如下。

def getServletHandlers = {
    require(running, "Can only call getServletHandlers on a running MetricsSystem")
    metricsServlet.map(_.getHandlers).getOrElse(Array())
}
可以看到调用了metricsServlet的getHandlers,其实现如下。
def getHandlers = Array[ServletContextHandler](
    createServletHandler(servletPath,
        new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)
)
最终生成处理/metrics/json请求的ServletContextHandler,而请求的真正处理由get-MetricsSnapshot方法,利用fastjson解析。生成的ServletContextHandler通过SparkUI的attachHandler方法,也被绑定到SparkUI(creatServlethandler与attachHandler方法在3.4.4节详细讲述过)。最终我们可以使用以下这些地址来访问测量数据。
http://localhost:4040/metrics/applications/json。
http://localhost:4040/metrics/json。
http://localhost:4040/metrics/master/json。
相关文章
|
7月前
|
SQL 分布式计算 Java
Apache IoTDB开发系统整合之Spark IoTDB Connecter
以下 TsFile 结构为例: TsFile 架构中有三个度量:状态、温度和硬件。
108 0
|
3月前
|
SQL 分布式计算 数据处理
Spark的生态系统概览:Spark SQL、Spark Streaming
Spark的生态系统概览:Spark SQL、Spark Streaming
|
4月前
|
存储 Java 关系型数据库
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
52 1
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
|
4月前
|
分布式计算 资源调度 大数据
【大数据技术Hadoop+Spark】Spark架构、原理、优势、生态系统等讲解(图文解释)
【大数据技术Hadoop+Spark】Spark架构、原理、优势、生态系统等讲解(图文解释)
208 0
|
9月前
|
机器学习/深度学习 存储 分布式计算
Hadoop生态系统中的机器学习与数据挖掘技术:Apache Mahout和Apache Spark MLlib的应用
Hadoop生态系统中的机器学习与数据挖掘技术:Apache Mahout和Apache Spark MLlib的应用
|
9月前
|
分布式计算 Hadoop Java
Hadoop生态系统中的流式数据处理技术:Apache Flink和Apache Spark的比较
Hadoop生态系统中的流式数据处理技术:Apache Flink和Apache Spark的比较
|
11月前
|
机器学习/深度学习 分布式计算 搜索推荐
基于Spark的电影推荐系统实现
基于Spark的电影推荐系统实现
|
SQL JSON 分布式计算
Spark Sql系统入门4:spark应用程序中使用spark sql
Spark Sql系统入门4:spark应用程序中使用spark sql
113 0
|
分布式计算 Spark
【Spark】【RDD】从本地文件系统创建RDD
【Spark】【RDD】从本地文件系统创建RDD
105 0
【Spark】【RDD】从本地文件系统创建RDD
|
分布式计算 Hadoop 大数据
大数据编程实验一:HDFS常用操作和Spark读取文件系统数据
大数据编程实验,利用本地搭建的伪分布式集群进行HDFS常用操作和Spark读取文件系统数据的操作。
743 1
大数据编程实验一:HDFS常用操作和Spark读取文件系统数据