《Elastic Stack 实战手册》——四、应用实践——4.2 可观测性应用场景 ——4.2.1.基于Elasticsearch实现预测系统(1) https://developer.aliyun.com/article/1226137
Prometheus 与 Elasticsearch 的数据同步
Prometheus 支持一种称为远程读写(Remote Read/Write)的功能,可以在 Prometheus 拉取监控数据时将数据写入到远程的数据源中,或者从远程数据源中读取监控数据做处理。
Prometheus 目前支持的远程数据源多达几十种,所有数据源都支持远程写入,但并不是所有数据源都同时支持远程读取,比如我们这里要使用的 Elasticsearch 就只支持远程写入。
注:7.10版本是支持的,可以通过 Metricbeat 对 Prometheus 进行远程拉取,参见:
https://www.elastic.co/guide/en/beats/metricbeat/current/metricbeat-metricset-prometheus-query.html
Prometheus 为远程读取和写入,分别定义了独立的通信与编码协议,第三方存储组件需要向
Prometheus 提供兼容该协议的 HTTP 接口地址。但由于众多第三方存储组件接收数据的方式并不相同,支持 Prometheus 读写协议的接口地址并不一定存在。为此,Prometheus 采用了适配器的方式屏蔽不同存储组件之间的差异,
如图1-2所示:
图1-2 Prometheus 远程读写
只要提供了适配器,Prometheus 读写数据就可以转换成第三方存储组件支持的协议和编码。
Elasticsearch 也需要使用适配器,这个适配器就是由 Elastic Stack 家族中的 Beat 组件担任。早期 Prometheus 官方推荐的适配器是 Infonova 开发的 Prometheusbeat,而在最新版本中已经转而推荐 Elastic Stack 官方的 Metricbeat。
无论是 Prometheusbeat 还是 Metricbeat,它们都可以向 Prometheus 开放 HTTP 监听地址,并且在接收到 Prometheus 监控数据后将它们存储到 Elasticsearch 中。Metricbeat 采用Module 的形式开启面向 Prometheus 的 HTTP 接口,具体配置如示例 1.1 所示:
- module: prometheus metricsets: ["remote_write"] host: "localhost" port: "9201"
示例1.1 Metricbeat 配置
添加示例所示配置内容后重启 Metricbeat,它就会在本机 9201端口开始监听 Prometheus 的写入数据。与此同时还要将这个监听地址告诉 Prometheus,这样它才知道在拉取到数据后向哪里写入。具体来说就是在 prometheus.yml中 添加如下内容:
remote_write: - url: "https://localhost:9201/write"
示例1.2 Prometheus 配置
使用 MetricBeat 组件保存监控数据,监控指标的标签(Label)会转换为 Elasticsearch 的文档字段。Metricbeat 还会附加一些 Metricbeat 自身相关的数据,所以文档中的字段会比实际监控指标的标签要多一些。
除了指标名称和标签以外,还包括主机地址、操作系统等信息,这可能会导致样本数据量急剧膨胀。但 Prometheus 并不支持在写入远程存储前过滤样本,所以只能通过 Beat 组件处理。
注:最高效的做法是在 metricbeat 上设置 drop_fields 的 processor,直接过滤,避免无效的网络传输。
比较理想的方法是将 Beat 组件的输出设置为 Logstash,然后再在 Logstash 中做数据过滤,最后再由 Logstash 转存到 Elasticsearch 中。
在我们的系统中,除了要过滤以上 Metricbeat 附加的字段以外,还需要组合、修正一些业务相关的字段。所以为了更好的处理数据,我们在整个数据系统中加入了 Logstash 组件。
最终整个数据系统的大致结构如图 1-3 所示:
Prometheus 在拉取到监控指标时,将数据同时发送给 Metricbeat 或 Prometheusbeat,它们在这里职责就是一个监控数据接收的端点。
Metricbeat 或 Prometheusbeat 再将数据发送给 Logstash,而 Logstash 则负责清洗和整理监控数据,并将它们存储到 Elasticsearch 中。我们将整个监控系统和数据系统部署在
Kubernetes 集群中,而最终用于机器学习的数据源 Elasticsearch 则被部署为独立的集群。
这样就将监控系统与数据分析系统隔离开来,从而保证了监控系统,不会受到大量数据分析运算的影响而出现故障。实际上图 1-3 就是对图 1-1中 所展示的监控系统、数据系统以及
Elasticsearch 集群的细化,但在实际应用中还有许多细节没有展现出来,需要读者根据项目实际情况做适当调整。
集成 Elasticsearch 与 Spark
事实上,新版 Kibana 组件中已经具备了非常强大的机器学习能力,但这项功能还未开放在基础授权中。此外我们的业务系统还有一些自身的特殊性,所以最终我们决定采用 Spark 编写代码实现这套智能预测系统。
一种显而易见的办法是利用 Elasticsearch 客户端接口,先将数据从 Elasticsearch 中读取出来再传给 Spark 进行分析处理。但 Spark 数据处理是先将任务分解成子任务,再将它们分发到不同计算节点上并行处理,以此提升海量数据分析处理的速度。而任务分解的同时是数据要分解,否则最终数据读取,就会成为所有子任务的瓶颈,这种分解后的数据在 Spark 中称为分区(Partition)。每个任务在各自的数据分区上运算处理,最终再将各自的处理结果按 Map/Reduce 的思想合并起来。
Elasticsearch 中的分片(Shard)刚好与 Spark 分区相契合,如果能让任务运算于分片之上,这无疑可以更充分地利用 Elasticsearch 存储特征。但如果是先从 Elasticsearch 读取数据再发送给Spark做处理,数据就是经由分片合并后的数据,Spark 在运算时就需要对数据再次进行分区。
Elasticsearch 包含一个开源的 elasticsearch-hadoop 项目,可以很好地解决数据分区问题。
elasticsearch-hadoop 可以与包括 Spark 在内的 Hadoop 生态良好集成,Spark 分区数据也可以与 Elasticsearch 分片数据直接对应起来。我们所要做只是将 elasticsearch-hadoop 的
Spark 依赖添加到项目中,并使用其提供的接口将数据从 Elasticsearch 中读取进来即可。智能预测系统采用 Java 语言开发,所以使用 Maven 引入如下依赖:
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-${spark.major.version}_${scala.version}</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency>
如示例 1.3 所示,elasticsearch-hadoop 的 Spark 依赖为 elasticsearch-spark。由于需要用到Spark ML 相关库,示例也将 spark-sql 和 spark-mllib 依赖也一并引入了进来。elasticsearch-spark 在 org.elasticsearch.spark.rdd.api.java 包中定义了类 JavaEsSpark,它提供了大量用于读取和写入 Elasticsearch 的静态方法。其中,静态方法 esRDD 用于从 Elasticsearch中读取索引,而 saveToEs 则用于向 Elasticsearch 中存储数据。
示例展示了从 Elasticsearch 中读取索引数据的代码片段:
List<Row> jobs = esRDD(sparkContext, metricIndex, queryString).values().map( map -> { Object[] values = new Object[featureFields.length+1]; for (int i = 0; i < featureFields.length; i++) { values[i] = map.get(featureFields[i]); } values[featureFields.length] = map.get(labelField); return RowFactory.create(values); } ).collect(); StructField[] structFields = new StructField[featureFields.length + 1]; for (int i = 0; i < featureFields.length; i++) { structFields[i] = new StructField(featureFields[i], DataTypes.StringType, true, Metadata.empty()); } structFields[featureFields.length] = new StructField(labelField, DataTypes.StringType, true, Metadata.empty()); StructType schema = new StructType(structFields); Dataset<Row> dataset = sparkSession.createDataFrame(jobs, schema);
示例1.4 使用 Spark 读入数据
示例中,sparkContext 是 SparkContext 实例,代表了与 Spark 运算环境相关的一些基本信息。MetricIndex 则是 esRDD 方法要读取文档数据的索引名称,而 queryString 则是检索文档的查询条件。esRDD 返回的类型为 JavaPairRDD,这是类似于一个 Pair 的集合。
每一个 Pair 代表索引中的一个文档,Pair 的 key 是字符串类型,代表了当前文档的标识符;而 value 则是一个 Map 类型,代表了整个文档数据。通过 JavaPairRDD 的 keys 方法将只返回所有文档的标识符,而通过 values 方法则会只返回所有文档的 Map 对象。
Map 的键为文档字段名称,而值则为文档字段对应的具体数值。示例中的代码就是通过调用
values 方法只获取文档的 Map 对象,然后再通过 Map 和 collect 方法将它们转换成 Row 的集合。
《Elastic Stack 实战手册》——四、应用实践——4.2 可观测性应用场景 ——4.2.1.基于Elasticsearch实现预测系统(3) https://developer.aliyun.com/article/1226131