2w 字带你实战 ElasticSearch (上)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 2w 字带你实战 ElasticSearch (上)


一、 基础知识

你会如何处理实时或准实时数据流?

在大数据时代,有很多方案可以帮助你完成这项任务。

接下来,我将通过一个系列的教程,我将利用Storm、Kafka、ElasticSearch逐步教你搭建一个实时计算系统。

搭建系统之前,我们首先需要了解一些定义。

通过考虑四个不同的属性,帮助你更好地理解大数据:数据量,速度,多样性和准确性。

  • 数据量 :海量数据
  • 速度 :数据处理的速度
  • 多样性 :任何类型的数据,包括结构化和非结构化
  • 准确性 :传入和传出数据的准确性

存在具有不同用途的大数据工具:

  • 数据处理 工具对数据执行某种形式的计算
  • 数据传输 工具将数据收集和引入数据处理工具
  • 数据存储工具 在不同处理阶段存储数据

数据处理工具可进一步分类为:

  • 批处理 :批处理是要一起处理的数据的集合。批处理允许你将不同的数据点连接、合并或聚合在一起。在整个批处理完成之前,其结果通常不可用。批处理越大,等待从中获取有用信息的时间越长。如果需要更直接的结果,流处理是更好的解决方案。
  • 流处理 :流处理器作用于无限制的数据流,而不是连续摄取的一批数据点(“流”)。与批处理过程不同,流中没有明确定义的数据流起点或终点,而且,它是连续的。

批处理流梳理

示例数据和方案

我们将使用一些实际数据来数据规约系统(DRS)。根据维基百科,“数据规约是将数字或字母数字信息…转换为校正,有序和简化的形式。基本概念是将大量数据规约为有意义的形式。”

数据源将是实际的航空公司历史飞行数据,我们的最终目标是能够在地图上显示航班历史数据。

我们将构建的最终数据处理链路如下图所示:

可以使用SMACK替代上述方案:

  • Spark:引擎(替代Storm)
  • Mesos:容器
  • Akka:模型
  • Cassandra:存储(替代ElasticSearch)
  • Kafka:消息队列

或者,你可以尝试自己使用自己喜欢的编程语言来实现它。

单线程调度程序使用以下方式以循环方式将工作分配给多个处理器(例如,可以是Raspberry Pi的阵列)。MQTT用于数据交换。每个处理器并行处理数据并产生结果,这些结果由收集器收集,收集器负责将其存储到数据库,NAS或实时呈现。由于我们没有与用于接收实时飞行数据的真实传感器(例如雷达)建立任何连接以演示实际流处理,因此我们只能选择批处理(即下载历史飞行数据并离线处理它们)。

我们将首先将数据直接存储到ElasticSearch并在Kibana或其他UI应用程序中可视化它们。

ElasticSearch

ElasticSearch是一个面向文档的分布式搜索引擎,用于处理以文档形式存储数据。

ElasticSearch具有如下优势:

  • 跨多个节点可扩展
  • 搜索结果速度非常快
  • 多语种
  • 面向文档
  • 支持即时搜索
  • 支持模糊搜索
  • 开源,不收费

ElasticStack由许多产品组成:

  • ElasticSearch :我们将在本文中重点介绍的
  • Kibana :一个分析和可视化平台,可让你轻松地可视化Elasticsearch中的数据并进行分析
  • LogStash :数据处理管道
  • Beats :数据传输集合
  • X-pack :可为Elasticsearch和Kibana添加其他功能,例如安全性(身份验证和授权),性能(监控),报告和机器学习

综上所述,可以使用Beats和/或Logstash将数据导入Elasticsearch,也可以直接通过ElasticSearch的API。Kibana用于可视化ElasticSearch中的数据。

接下来,我们将学习如何安装,启动和停止ElasticSearch和Kibana。在下一篇文章中,我们将提供产品概述,并学习如何将批量航班数据导入ElasticSearch。

1. 安装ElasticSearch & Kibana

访问ElasticSearch网站,下载安装包,解压,然后进行接下,你会发现它包含如下内容:

bin
config
data
jdk
lib
logs
modules
plugins

它的主要配置文件是config/elasticsearch.yml

通过如下命令,可以运行ElasticSearch,

cd <elasticsearch-installation>
bin/elasticsearch

使用浏览器打开链接http:// localhost:9200/,如果看到类似以下的内容,那么恭喜你,你已经正常运行ElasticSearch实例了。

{
  "name" : "MacBook-Pro.local",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "jyxqsR0HTOu__iUmi3m3eQ",
  "version" : {
    "number" : "7.9.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "a479a2a7fce0389512d6a9361301708b92dff667",
    "build_date" : "2020-08-11T21:36:48.204330Z",
    "build_snapshot" : false,
    "lucene_version" : "8.6.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

ElasticSearch由一组节点组成(也就是存储数据的ElasticSearch实例),每个节点存储部分数据,同一台计算机上运行多个实例。

http://localhost:9200/_cluster/health?pretty
{
  "cluster_name" : "elasticsearch",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

集群状态为green,我们看到它仅包含1个节点。数据作为JSON对象(或文档)存储在ElasticSearch中,使用索引将文档组织在群集内。索引是具有相似特征并在逻辑上相关的文档的集合,通过索引,在逻辑上将文档分组在一起,并提供与可伸缩性和可用性相关的配置选项。

数据分布在各个节点中,但是,实际上是如何实现的呢?

ElasticSearch使用分片。

分片是一种将索引分为不同部分的方法,其中每个部分称为分片,分片可水平缩放数据。

如果发生磁盘故障并且存储分片的节点发生故障,该怎么办?

如果我们只有一个节点,那么所有数据都会丢失。

默认情况下,ElasticSearch支持分片复制以实现容错功能。主碎片的副本碎片在存储主碎片的节点以外的节点中创建。主分片和副本分片都称为复制组。在我们只有一个节点的示例中,没有复制发生。如果磁盘出现故障,我的所有数据都会丢失。我们添加的节点越多,通过在节点周围散布碎片就可以提高可用性。

ElasticSearch集群暴露REST API,使得开发者可以通过GETPOSTPUTDELETE命令进行访问。

有多种方法可以向ElasticSearch发出命令。

  • 通过在浏览器中或使用curl命令
  • 通过Kibana的控制台工具

curl的访问语法如下:

curl -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'

参数解释:

  • <VERB>:HTTP请求方法,GETPOSTPUTDELETE
  • <PROTOCOL>.:如果你在Elasticsearch前面有HTTPS代理,或者使用Elasticsearch安全功能来加密HTTP通信,请使用后者
  • <HOST>:Elasticsearch集群中任何节点的主机名
  • <PORT>:运行Elasticsearch HTTP服务的端口,默认为9200
  • <PATH>:API的endpoint
  • <BODY>:JSON编码的请求正文

例如:

curl -X GET "localhost:9200/flight/_doc/1?pretty"

将返回存储在索引中的所有文档,由于我们尚未在ElasticSearch中插入任何文档,因此该查询将返回错误。

前面介绍了ElasticSearch的安装方法,下面介绍一下Kibana的安装。

访问网站下载安装包,解压,通过下方命令运行Kibana:

cd <kibana-installation>
bin/kibana

在启动Kibana之前,请确保ElasticSearch已启动并正在运行。

Kibana的目录结构如下:

bin
built_assets
config
data
node
node_modules
optimize
package.json
plugins
src
webpackShims
x-pack

首次运行Kibana(http://localhost:5601)时,会让你提供样本数据或自行探索。

使用浏览器发送下方命令:

GET /_cat/health?v

会得到下方信息:

epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1585684478 19:54:38 elasticsearch green 1 1 6 6 0 0 7 0 - 100.0%

_catAPI提供有关属于群集的节点的信息。

有一个更方便的API GET /_cat/indices?pretty,它提供了有关节点的更多详细信息。

health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .apm-custom-link ticRJ0PoTk26n8Ab7-BQew 1 0 0 0 208b 208b
green open .kibana_task_manager_1 SCJGLrjpTQmxAD7yRRykvw 1 0 6 99 34.4kb 34.4kb
green open .kibana-event-log-7.9.0-000001 _RqV43r_RHaa-ztSvhV-pA 1 0 1 0 5.5kb 5.5kb
green open .apm-agent-configuration 61x6ihufQfOiII0SaLHrrw 1 0 0 0 208b 208b
green open .kibana_1 lxQoYjPiStuVyK0pQ5_kaA 1 0 22 1 10.4mb 10.4mb

在这一部分,我主要介绍了一下搭建数据规约系统涉及到的一些基本概念,以及ElasticSearch、Kibana的安装,确保,这两款关键工具能够正常运行。

在下一篇文章中,我们将看到如何将批量航班数据导入到ElasticSearch,并查看如何实际搜索它们。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

二、ElasticSearch操作

在前面这一部分,我已经解释了ElasticSearch的基础知识及其工作原理。

在这一部分,我们将学习如何在ElasticSearch中执行搜索。

CRUD

在开发过程中,主要都在围绕着数据的CRUD进行处理,具体来说就是:

  • C – Create
  • R – Retrieve or Read
  • U – Update
  • D – Delete

下表将每个CRUD命令与其各自的ElasticSearch HTTP / REST命令进行了一一对应,

CRUD command HTTP/REST command
Create PUT or POST
Read GET
Update PUT or POST
Delete DELETE

上一篇文章中,我们学习了Kibana,接下来,就切换到Kibana的控制台。

1. 创建索引

通过如下命令,创建一个flight索引:

PUT /flight
GET /_cluster/health

请注意,现在群集的运行状况已从绿色变为黄色。发生这种情况是因为我们仅运行一个Elasticsearch实例。单节点群集具有完整的功能,但是无法将数据复制到其他节点以提供弹性。副本分片必须有其他可用节点,群集状态才能变为绿色。如果群集状态为红色,则标识某些数据不可用。

为了解决这个问题,您需要安装另一个同样的Elasticsearch,并在elasticsearch.yml中更改node.name;两个实例中的cluster.name必须保持相同(默认为elasticsearch)。

另一种方法是在命令行上将配置参数传递给Elasticsearch。

bin/elasticsearch -Enode.name=node-2 -Epath.data=./node-2/data -Epath.logs=./node-2/logs
GET /_cat/indices?v
health status index  uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   flight w696u4y3SYWuGW--8VzW6Q   1   1          0            0       208b           208b

2. 创建文档

下面,向我们的索引添加一些示例数据:

PUT /flight/_doc/1 
{
  "Icao":"A0835D",
  "Alt":2400,
  "Lat":39.984322,
  "Long":-82.925616
}

也可以使用curl命令:

curl -X PUT "localhost:9200/flight/_doc/1?pretty" -H 'Content-Type: application/json' -d'
{
  "Icao":"A0835D",
  "Alt":2400,
  "Lat":39.984322,
  "Long":-82.925616
}'

在这种情况下,ElasticSearch将为我们的文档生成一个自动ID。这是ElasticSearch返回的结果:

Content-Type对于查询成功至关重要, 我们创建了一个ID = 1的新排期。我们也可以使用POST代替PUT,但是在这种情况下,我们无法传递ID。

在这种情况下,ElasticSearch将为我们的文档生成一个自动ID。

下面是ElasticSearch返回的结果:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "flight",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "Icao" : "A0835D",
          "Alt" : 2400,
          "Lat" : 39.984322,
          "Long" : -82.925616
        }
      }
    ]
  }
}

结果文档存储在键值_source内。

3. 删除文档

如果你知道文档索引,可以直接通过索引进行删除:

DELETE /flight/_doc/1

4. 删除索引

通过下方命令删除索引:

DELETE /flight

5. 批量导入数据

我们的方案是处理航班数据,理想情况下,这些数据是从多个传感器(雷达)实时获得的,但是由于这很难实现。

因此,我们将使用可从此处下载的批量历史飞行数据。

在下载批处理文件的目录中,发送以下命令(每个.json文件):

curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flights/_bulk --data-binary "@2016-07-01-1300Z.json"

请注意,内容类型是application/x-ndjson,而不是application/x-json

另外,请注意,我们将数据表示为二进制以便保留换行符。

磁瓦ElasticSearch需要json文档满足特定格式:

{"index":{"_id":4800770}}
{"Rcvr":1,"HasSig":false,"Icao":"494102", "Bad":false,"Reg":"CS-PHB", ...}
...

这意味着你必须将每个下载的.json文件转换为上述格式。

如果你不想花时间手动修改.json文档,则在下一篇文章中,我们将开发一个Java程序来解析它们,并使用ElasticSearch的REST API将文件插入ElasticSearch中。

6. 搜索查询

ElasticSearch是一款搜索相关的工具,它允许你进行符合条件的搜索查询。

GET /flight/_search?pretty
{ "query": {
     "match_all" : {
     }
   }
}

上面的搜索查询匹配索引对应的所有文档。也可以这样简化:

GET /flight/_search

下面是根据给定字段Icao进行查询:

GET /flight/_search?pretty 
{ "query": {
     "match" : {
      "Icao" : "A0835D"
     }
   }
}

也可以用嵌入URL进行搜索:

GET /flight/_search?q=Icao:A0835D

也可以这样写:

GET /flight/_search?pretty 
{ "query": {
     "query_string": {
      "query": "Icao:A0835D"
     }
   }
}

除了“match”和“query_string”以外,还可以使用“term”。使用“ term”表示精确匹配。

GET /flight/_search?pretty 
{ "query": {
     "term": {
      "Mil": true
     }
   }
}

你也可以使用“term”来搜索值数组。

除此之外,还可以使用通配符“wildcard”进行搜索,包括*/?

GET /flight/_search?pretty 
{ "query": {
     "wildcard": {
      "Call": "NJ*"
     }
   }
}

7. 更新文档

如果你知道索引的ID,可以通过_updateAPI进行更新。

POST /flight/_update/4800770
{
  "doc": {
    "Mil": true
  }
}

使用上述命令,我们也可以将新字段添加到文档中。

附带说明一下,ElasticSearch文档是不可变的!

因此,当我们请求更新文档时,ElasticSearch会在后台进行操作,它检索文档,更改其字段并为具有相同ID的文档重新索引,从而对它进行替换。

可以使用脚本发送更复杂的查询,

POST /flight/_update/4800770
{
  "script": {
    "source": "ctx._source.FlightsCount++" 
  }
}

ctx表示上下文。

还有许多其他更新文档的方法,例如,upserts,即根据文件是否已存在有条件地更新或插入文件。

POST /flight/_update/4800771
{
  "script": {
    "source": "ctx._source.FlightsCount++" 
  },
  "upsert": {
    "Rcvr":1,
    "HasSig":false,
    "Icao":"AE4839",
    ... 
  },
}

8. 删除文档

使用_delete_by_query API可以删除文档:

POST /flight/_delete_by_query
{
  "query": {
    "match_all": {}
  }
}

9. 批量查询

批量API可帮助我们通过一个查询对许多文档执行同样的操作。

该API包含4个动作:索引,创建,更新,删除:

POST /_bulk
{ "index": { "_index" : "flight", "_id": 10519389 } }
{ "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... }
{ "create": { "_index" : "flight", "_id": 4800770 } }
{"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... }
{ "update": { "_index" : "flight", "_id": 4800770 } }
{ "doc": {"Mil": true } }
{ "delete": { "_index" : "flight", "_id": 4800770 } }

索引和创建操作之间的区别如下:如果文档已经存在,则创建将引发错误,而索引将替换文档。

如果批量查询要针对相同的索引运行,那么我们可以像这样简化查询:

POST /flight/_bulk
{ "index": { "_id": 10519389 } }
{ "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... }
{ "create": { "_id": 4800770 } }
{"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... }
{ "update": { "_id": 4800770 } }
{ "doc": {"Mil": true } }
{ "delete": { "_id": 4800770 } }

10. 映射

ElasticSearch是如何映射数据的呢?

动态映射意味着没有明确定义映射,或者至少没有为某些字段定义。

ElasticSearch是通过检查文档字段的值类型来完成的。

要查看数据映射,请在Kibana中执行以下内容:

GET /flight/_mapping

我们也可以通过下方命令手动添加映射关系,

PUT /flight/_mapping 
{
   "properties": {
     "location": {
       "type": "geo_point"
     }
   }
}

请注意,一旦创建了字段映射,就不能对其进行修改。唯一的方法是删除并重新创建索引。

在下面的示例中,我们手动创建了各种禁用动态映射的映射。

PUT /flight/_mapping
{
    "dynamic": false,
    "properties": {
      "Rcvr": {
        "type": "integer"
      },
      "Icao": {
        "type": "text"
      },
      ...
      "location": {
        "type": "geo_point"
      }
   }
}

如果你更新了映射,请在禁用动态映射的情况下发出以下查询来更新ElasticSearch,

POST /flight/_update_by_query?conflicts_proceed

在这部分,我重点介绍了如何使用ElasticSearch的常用功能。

在下一一部分中,我们将学习如何在将json文件转换为ElasticSearch的批量API所需的格式之后,以及通过使用JSON库解析json文件,并将批处理json文件导入到ElasticSearch中。

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
2天前
|
存储 JSON 搜索推荐
Springboot2.x整合ElasticSearch7.x实战(三)
Springboot2.x整合ElasticSearch7.x实战(三)
8 0
|
2天前
|
存储 自然语言处理 关系型数据库
Springboot2.x整合ElasticSearch7.x实战(二)
Springboot2.x整合ElasticSearch7.x实战(二)
6 0
|
2天前
|
搜索推荐 数据可视化 Java
Springboot2.x整合ElasticSearch7.x实战(一)
Springboot2.x整合ElasticSearch7.x实战(一)
6 0
|
22天前
|
自然语言处理 测试技术 网络安全
ElasticSearch7最新实战文档-附带logstash同步方案
ElasticSearch7最新实战文档-附带logstash同步方案
22 0
|
1月前
|
人工智能 自然语言处理 开发者
Langchain 与 Elasticsearch:创新数据检索的融合实战
Langchain 与 Elasticsearch:创新数据检索的融合实战
46 10
|
1月前
|
canal 自然语言处理 关系型数据库
Elasticsearch 线上实战问题及解决方案探讨
Elasticsearch 线上实战问题及解决方案探讨
24 0
|
1月前
|
监控 API 索引
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
实战问题:Elasticsearch 2.X 数据如何迁移到 7.X?
17 0
|
1月前
|
Java Maven 开发工具
【ElasticSearch 】IK 分词器安装
【ElasticSearch 】IK 分词器安装
38 1
|
2天前
Elasticsearch安装配置文件
Elasticsearch安装配置文件
8 0
|
1月前
|
存储 数据可视化 数据挖掘
【ElasticSearch】ElasticSearch安装
【ElasticSearch】ElasticSearch安装
35 2