《Elastic Stack 实战手册》——四、应用实践——4.2 可观测性应用场景 ——4.2.6.运用Elastic Stack分析COVID-19数据(1) https://developer.aliyun.com/article/1225617
配置 Filebeat 及导入数据到 Elasticsearch
为了能够把我们的数据导入到 Elasticsearch 中,我们可以采用 Filebeat。为此,我们必须来配置 Filebeat。我们首先来创建一个定制的 filebeat_covid19.yml 配置文件,并把这个文件存于和我们上面的 covid19.csv 同一个目录中:
filebeat_covid19.yml
filebeat.inputs: - type: log paths: - /Users/liuxg/data/covid19/covid19.csv exclude_lines: ['^Lat'] output.elasticsearch: hosts: ["http://localhost:9200"] index: covid19 setup.ilm.enabled: false setup.template.name: covid19 setup.template.pattern: covid19
在上面我们定义了一个 type 为 log 的 filebeat.inputs。我们定了我们的 log 的路径。你需要根据自己的实际路径来修改上面的路径。
我们定义了数据的 index 为 covid19 的索引。值得注意的是,由于 csv 文件的第一行是数据的 header,我们需要去掉这一行。为此,我们采用了 exclude_lines: ['^Lat'] 来去掉第一行。
等我们定义好上面的配置后,我们运行如下的命令:
./filebeat -e -c ~/data/covid19/filebeat_covid19.yml
上面的命令将把我们的数据导入到 Elasticsearch 中。
Filebeat 的 registry 文件存储 Filebeat 用于跟踪上次读取位置的状态和位置信息。如果由于某种原因,我们想重复对这个 csv 文件的处理,我们可以删除如下的目录:
· data/registry 针对 .tar.gz and .tgz 归档文件安装
· /var/lib/filebeat/registry 针对 DEB 及 RPM 安装包
· c:\ProgramData\filebeat\registry 针对 Windows zip 文件
如果上面的命令成功后,我们可以在 Kibana 中查看新生产的 covid19 索引:
如果可以对数据进行查询:
GET covid19/_search
在上面,它显示了 message 字段。
显然这个数据是最原始的数据,它并不能让我们很方便地对这个数据进行分析。那么我们该如何处理呢?我们可以通过 Elasticsearch 的 ingest node 提供的强大的 processors 来帮我们处理这个数据。
利用 Processors 来加工数据
去掉无用的字段
在我们的文档里,我们可以看到有很多我们并不想要的字段,比如 ecs, host,log 等等。我们想把这些字段去掉,那么我们该如何做呢?我们可以通过定义一 个 pipleline 来帮我们处理。为此,我们定义一个如下的 pipeline:
PUT _ingest/pipeline/covid19_parser { "processors": [ { "remove": { "field": ["log", "input", "ecs", "host", "agent"], "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != n ull && ctx.agent != null" } } ] }
上面的 pipeline 定义了一个叫做 remove 的 processor。它检查 log,input, ecs, host 及 agent 都不为空的情况下,删除字段 log, input,ecs, host 及 agent。我们在 Kibana 中执行上面的命令。
为了能够使得我们的 pipleline 起作用,我们通过如下指令来执行:
POST covid19/_update_by_query?pipeline=covid19_parser
当我们执行完上面的指令后,我们重新查看我们的文档:
在上面我们可以看出来,所有的我们不想要的字段都已经被去掉了
替换引号
我们可以看到导入的 message 数据为:
"""37.1232245,-78.4927721,"Virginia, US",Virginia,",",US,221,0,0"""
显然,这里的数据有很多的引号"字符,我们想把这些字符替换为符号'。为此,我们需要 gsubprocessors 来帮我们处理。我重新修改我们的 pipeline:
PUT _ingest/pipeline/covid19_parser { "processors": [ { "remove": { "field": ["log", "input", "ecs", "host", "agent"], "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host ! = null && ctx.agent ! = null" } }, { "gsub": { "field": "message", "pattern": "\"", "replacement": "'" } } ] }
在 Kibana 中运行上面的指令,并同时执行:
POST covid19/_update_by_query?pipeline=covid19_parser
经过上面的 pipeline 的处理后,我们重新来查看我们的文档:
从上面的显示中,我们也看出来我们已经成功都去掉了引号。我们的 message 的信息如下:
"37.1232245,-78.4927721,'Virginia, US',Virginia,',',US,221,0,0"
《Elastic Stack 实战手册》——四、应用实践——4.2 可观测性应用场景 ——4.2.6.运用Elastic Stack分析COVID-19数据(3) https://developer.aliyun.com/article/1225615