带你读《Elastic Stack 实战手册》之75:——4.2.1.基于Elasticsearch实现预测系统(2)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 带你读《Elastic Stack 实战手册》之75:——4.2.1.基于Elasticsearch实现预测系统(2)

《Elastic Stack 实战手册》——四、应用实践——4.2 可观测性应用场景 ——4.2.1.基于Elasticsearch实现预测系统(1) https://developer.aliyun.com/article/1226137



Prometheus 与 Elasticsearch 的数据同步

 

Prometheus 支持一种称为远程读写(Remote Read/Write)的功能,可以在 Prometheus 拉取监控数据时将数据写入到远程的数据源中,或者从远程数据源中读取监控数据做处理。

 

Prometheus 目前支持的远程数据源多达几十种,所有数据源都支持远程写入,但并不是所有数据源都同时支持远程读取,比如我们这里要使用的 Elasticsearch 就只支持远程写入。


注:7.10版本是支持的,可以通过 Metricbeat 对 Prometheus 进行远程拉取,参见:

https://www.elastic.co/guide/en/beats/metricbeat/current/metricbeat-metricset-prometheus-query.html

 

Prometheus 为远程读取和写入,分别定义了独立的通信与编码协议,第三方存储组件需要向

Prometheus 提供兼容该协议的 HTTP 接口地址。但由于众多第三方存储组件接收数据的方式并不相同,支持 Prometheus 读写协议的接口地址并不一定存在。为此,Prometheus 采用了适配器的方式屏蔽不同存储组件之间的差异,

 

如图1-2所示:

image.png


图1-2 Prometheus 远程读写


只要提供了适配器,Prometheus 读写数据就可以转换成第三方存储组件支持的协议和编码。

Elasticsearch 也需要使用适配器,这个适配器就是由 Elastic Stack 家族中的 Beat 组件担任。早期 Prometheus 官方推荐的适配器是 Infonova 开发的 Prometheusbeat,而在最新版本中已经转而推荐 Elastic Stack 官方的 Metricbeat。

 

无论是 Prometheusbeat 还是 Metricbeat,它们都可以向 Prometheus 开放 HTTP 监听地址,并且在接收到 Prometheus 监控数据后将它们存储到 Elasticsearch 中。Metricbeat 采用Module 的形式开启面向 Prometheus 的 HTTP 接口,具体配置如示例 1.1 所示:


- module: prometheus
  metricsets: ["remote_write"]
  host: "localhost"
  port: "9201"

示例1.1 Metricbeat 配置


添加示例所示配置内容后重启 Metricbeat,它就会在本机 9201端口开始监听 Prometheus 的写入数据。与此同时还要将这个监听地址告诉 Prometheus,这样它才知道在拉取到数据后向哪里写入。具体来说就是在 prometheus.yml中 添加如下内容:

 

remote_write:
  - url: "https://localhost:9201/write"

示例1.2 Prometheus 配置


使用 MetricBeat 组件保存监控数据,监控指标的标签(Label)会转换为 Elasticsearch 的文档字段。Metricbeat 还会附加一些 Metricbeat 自身相关的数据,所以文档中的字段会比实际监控指标的标签要多一些。

 

除了指标名称和标签以外,还包括主机地址、操作系统等信息,这可能会导致样本数据量急剧膨胀。但 Prometheus 并不支持在写入远程存储前过滤样本,所以只能通过 Beat 组件处理。

 

注:最高效的做法是在 metricbeat 上设置 drop_fields 的 processor,直接过滤,避免无效的网络传输 

 

比较理想的方法是将 Beat 组件的输出设置为 Logstash,然后再在 Logstash 中做数据过滤,最后再由 Logstash 转存到 Elasticsearch 中。

在我们的系统中,除了要过滤以上 Metricbeat 附加的字段以外,还需要组合、修正一些业务相关的字段。所以为了更好的处理数据,我们在整个数据系统中加入了 Logstash 组件

 

最终整个数据系统的大致结构如图 1-3 所示:

image.png


Prometheus 在拉取到监控指标时,将数据同时发送给 Metricbeat 或 Prometheusbeat,它们在这里职责就是一个监控数据接收的端点。

 

Metricbeat 或 Prometheusbeat 再将数据发送给 Logstash,而 Logstash 则负责清洗和整理监控数据,并将它们存储到 Elasticsearch 中。我们将整个监控系统和数据系统部署在

Kubernetes 集群中,而最终用于机器学习的数据源 Elasticsearch 则被部署为独立的集群。

 

这样就将监控系统与数据分析系统隔离开来,从而保证了监控系统,不会受到大量数据分析运算的影响而出现故障。实际上图 1-3 就是对图 1-1中 所展示的监控系统、数据系统以及

Elasticsearch 集群的细化,但在实际应用中还有许多细节没有展现出来,需要读者根据项目实际情况做适当调整。

 

集成 Elasticsearch 与 Spark

 

事实上,新版 Kibana 组件中已经具备了非常强大的机器学习能力,但这项功能还未开放在基础授权中。此外我们的业务系统还有一些自身的特殊性,所以最终我们决定采用 Spark 编写代码实现这套智能预测系统。

 

一种显而易见的办法是利用 Elasticsearch 客户端接口,先将数据从 Elasticsearch 中读取出来再传给 Spark 进行分析处理。但 Spark 数据处理是先将任务分解成子任务,再将它们分发到不同计算节点上并行处理,以此提升海量数据分析处理的速度。而任务分解的同时是数据要分解,否则最终数据读取,就会成为所有子任务的瓶颈,这种分解后的数据在 Spark 中称为分区(Partition)。每个任务在各自的数据分区上运算处理,最终再将各自的处理结果按 Map/Reduce 的思想合并起来。


Elasticsearch 中的分片(Shard)刚好与 Spark 分区相契合,如果能让任务运算于分片之上,这无疑可以更充分地利用 Elasticsearch 存储特征。但如果是先从 Elasticsearch 读取数据再发送给Spark做处理,数据就是经由分片合并后的数据,Spark 在运算时就需要对数据再次进行分区。

 

Elasticsearch 包含一个开源的 elasticsearch-hadoop 项目,可以很好地解决数据分区问题。

elasticsearch-hadoop 可以与包括 Spark 在内的 Hadoop 生态良好集成,Spark 分区数据也可以与 Elasticsearch 分片数据直接对应起来。我们所要做只是将 elasticsearch-hadoop 的

Spark 依赖添加到项目中,并使用其提供的接口将数据从 Elasticsearch 中读取进来即可。智能预测系统采用 Java 语言开发,所以使用 Maven 引入如下依赖:


 <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-spark-${spark.major.version}_${scala.version}</artifactId>
      <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_${scala.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>

如示例 1.3 所示,elasticsearch-hadoop 的 Spark 依赖为 elasticsearch-spark。由于需要用到Spark ML 相关库,示例也将 spark-sql 和 spark-mllib 依赖也一并引入了进来。elasticsearch-spark 在 org.elasticsearch.spark.rdd.api.java 包中定义了类 JavaEsSpark,它提供了大量用于读取和写入 Elasticsearch 的静态方法。其中,静态方法 esRDD 用于从 Elasticsearch中读取索引,而 saveToEs 则用于向 Elasticsearch 中存储数据。

 示例展示了从 Elasticsearch 中读取索引数据的代码片段:


  List<Row> jobs = esRDD(sparkContext, metricIndex, queryString).values().map(
                map -> {
                    Object[] values = new Object[featureFields.length+1];
                    for (int i = 0; i < featureFields.length; i++) {
                        values[i] = map.get(featureFields[i]);
                    }
                    values[featureFields.length] = map.get(labelField);
                    return RowFactory.create(values);
                }
        ).collect();
        StructField[] structFields = new StructField[featureFields.length + 1];
        for (int i = 0; i < featureFields.length; i++) {
            structFields[i] = new StructField(featureFields[i], DataTypes.StringType, true, Metadata.empty());
        }
        structFields[featureFields.length] = new StructField(labelField, DataTypes.StringType, true, Metadata.empty());
        StructType schema = new StructType(structFields);
        Dataset<Row> dataset = sparkSession.createDataFrame(jobs, schema);

示例1.4 使用 Spark 读入数据


示例中,sparkContextSparkContext 实例,代表了与 Spark 运算环境相关的一些基本信息。MetricIndex 则是 esRDD 方法要读取文档数据的索引名称,而 queryString 则是检索文档的查询条件。esRDD 返回的类型为 JavaPairRDD,这是类似于一个 Pair 的集合。

 

每一个 Pair 代表索引中的一个文档,Pair 的 key 是字符串类型,代表了当前文档的标识符;而 value 则是一个 Map 类型,代表了整个文档数据。通过 JavaPairRDD 的 keys 方法将只返回所有文档的标识符,而通过 values 方法则会只返回所有文档的 Map 对象。

 

Map 的键为文档字段名称,而值则为文档字段对应的具体数值。示例中的代码就是通过调用

values 方法只获取文档的 Map 对象,然后再通过 Map 和 collect 方法将它们转换成 Row 的集合。

 


《Elastic Stack 实战手册》——四、应用实践——4.2 可观测性应用场景 ——4.2.1.基于Elasticsearch实现预测系统(3) https://developer.aliyun.com/article/1226131

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
5月前
|
运维 监控 Java
探索Elasticsearch在Java环境下的全文检索应用实践
【6月更文挑战第30天】在大数据背景下,Elasticsearch作为分布式搜索分析引擎,因其扩展性和易用性备受青睐。本文指导在Java环境中集成Elasticsearch,涉及安装配置、使用RestHighLevelClient连接、索引与文档操作,如创建索引、插入文档及全文检索查询。此外,还讨论了高级查询、性能优化和故障排查,帮助开发者高效处理非结构化数据全文检索。
169 0
电子书阅读分享《Elasticsearch全观测技术解析与应用(构建日志、指标、APM统一观测平台)》
电子书阅读分享《Elasticsearch全观测技术解析与应用(构建日志、指标、APM统一观测平台)》
|
1月前
|
机器学习/深度学习 存储 运维
探索未来:结合机器学习功能拓展Elasticsearch应用场景
【10月更文挑战第8天】随着数据量的爆炸性增长,高效的数据存储、检索和分析变得越来越重要。Elasticsearch 作为一个分布式的搜索和分析引擎,以其强大的全文搜索能力、实时分析能力和可扩展性而闻名。近年来,随着机器学习技术的发展,将机器学习集成到 Elasticsearch 中成为了一种新的趋势,这不仅增强了 Elasticsearch 的数据分析能力,还开拓了一系列新的应用场景。
49 7
|
3月前
|
运维 监控 数据可视化
Elasticsearch全观测技术解析问题之面对客户不同的场景化如何解决
Elasticsearch全观测技术解析问题之面对客户不同的场景化如何解决
|
3月前
|
消息中间件 监控 Kafka
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
202 3
|
3月前
|
机器学习/深度学习 存储 运维
Elasticsearch 中的异常检测机制与应用场景
【8月更文第28天】随着数据量的增长和业务复杂性的提升,实时监测和分析大量数据成为一项挑战。Elasticsearch 不仅是一个高性能的全文搜索引擎,也是一个灵活的数据存储和分析平台。通过集成机器学习(ML)功能,Elasticsearch 能够实现更高级的数据分析任务,如异常检测。
62 0
|
6月前
|
搜索推荐 Java 数据库
springboot集成ElasticSearch的具体操作(系统全文检索)
springboot集成ElasticSearch的具体操作(系统全文检索)
|
6月前
|
运维 架构师 搜索推荐
7 年+积累、 Elastic 创始人Shay Banon 等 15 位专家推荐的 Elasticsearch 8.X新书已上线...
7 年+积累、 Elastic 创始人Shay Banon 等 15 位专家推荐的 Elasticsearch 8.X新书已上线...
84 4
|
6月前
|
运维 监控 Java
探索Elasticsearch在Java环境下的全文检索应用实践
【4月更文挑战第17天】本文介绍了在Java环境下使用Elasticsearch实现全文检索的步骤。首先,简述了Elasticsearch的功能和安装配置。接着,通过Maven添加`elasticsearch-rest-high-level-client`依赖,创建`RestHighLevelClient`实例连接Elasticsearch。内容包括:创建/删除索引,插入/查询文档。还探讨了高级全文检索功能、性能优化和故障排查技巧。通过Elasticsearch,开发者能高效处理非结构化数据,提升应用程序价值。
92 6
|
6月前
|
存储 安全 数据处理
Elastic 中国开发者大会2023最新干货——Elasticsearch 7、8 新功能一网打尽
Elastic 中国开发者大会2023最新干货——Elasticsearch 7、8 新功能一网打尽
69 0

相关产品

  • 检索分析服务 Elasticsearch版