【最佳实践】运用 Logstash Fingerprint 过滤器处理并删除 Elasticsearch 重复数据

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 这篇文章介绍了使用 Logstash 在 Elasticsearch 中对数据进行重复数据删除的方法。 根据你的用例,Elasticsearch中 的重复内容可能不被接受。 例如,如果你要处理指标,则 Elasticsearch中 的重复数据可能会导致错误的聚合和不必要的警报。 即使对于某些搜索用例,重复的数据也可能导致不良的分析和搜索结果。
本文作者:刘晓国,Elastic公司社区布道师。新加坡国立大学硕士,西北工业大学硕士,曾就职于新加坡科技,康柏电脑,通用汽车,爱立信,诺基亚,Linaro,Ubuntu,Vantiq等企业。

*如果你想一站式快速体验 Elasticsearch 所有功能(包括 X-pack 付费功能),开通 阿里云 Elasticsearch 1核2G,即可首月免费试用
*

背景:Elasticsearch 索引

在介绍重复数据删除解决方案之前,让我们简要介绍一下 Elasticsearch 的索引编制过程。 Elasticsearch 提供了一个 REST API 来为你的文档建立索引。你可以选择提供唯一代表您的文档的 ID,也可以让 Elasticsearch 为你生成ID。如果您将 HTTP PUT 与索引 API 一起使用,Elasticsearch 希望您提供一个 ID。如果已经存在具有相同 ID 的文档,Elasticsearch 将用你刚才提供的文档替换现有内容-最后索引的文档将获胜。如果使用 POST 动词,则即使语料库中已经存在内容,Elasticsearch 也会生成具有新ID的新文档。例如,假设你刚在一秒钟之前为博客文章建立了索引,并使用 POST 动词重新发送了同一篇博客文章,Elasticsearch 创建了另一个具有相同内容但新具有 ID 的文档。

虽然 Elasticsearch 提供了一个显式的 _update API,可以将其用作潜在的解决方法,但我们将把本文重点放在索引 API 上。

Logstash 的 Elasticsearch 输出使用索引API,并且默认情况下不希望提供 ID。因此,它将每个单个事件视为单独的文档。但是,有一个选项可让你轻松为 Logstash 中的每个事件设置唯一的 ID。

定义你的ID

如果你的数据源已经有一个ID,那么在索引到 Elasticsearch 之前很容易将其设置为文档ID。 例如,JDBC 输入的用户可以轻松地将源表中的主键用作 Elasticsearch ID。 使用字段引用语法,可以在输出部分中直接设置文档 ID:

output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[upc_code]}"
  }
}

删除重复的相似内容

如前所述,在你的用例中,重复的内容可能是不可接受的。使用称为指纹的概念和 Logstash 指纹过滤器(fingerprint),您可以创建一个称为指纹的新字符串字段,以唯一地标识原始事件。指纹过滤器可以将原始事件中的一个或多个字段(默认为消息字段)作为源来创建一致的哈希值 (hash)。一旦创建了这些指纹,你就可以将其用作下游Elasticsearch输出中的文档ID。这样,Elasticsearch 将仅在比较指纹后更新或覆盖现有文档内容,但绝不会复制它们。如果你想考虑更多字段以进行删除重复数据,则可以使用 concatenate_sources 选项。

指纹过滤器具有多种算法,您可以选择创建此一致的哈希(hash)。请参阅文档,因为每个函数的哈希强度不同,可能需要其他选项。在下面的示例中,我们使用 MURMUR3 方法从消息字段创建哈希并将其设置在元数据字段中。元数据字段不会发送到输出,因此它们提供了一种在处理管道中的事件时临时存储数据的有效方法。

filter {
  fingerprint {
    source => "message"
    target => "[@metadata][fingerprint]"
    method => "MURMUR3"
  }
}
 
output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[@metadata][fingerprint]}"
  }
}

如果使用任何加密哈希函数算法(例如SHA1,MD5),则需要提供密钥选项。 密钥可以是用于计算 HMAC 的任意字符串。

filter {
  fingerprint {
    source => "message"
    target => "[@metadata][fingerprint]"
    method => "SHA1",
    key => "Log analytics",
    base64encode => true
  }
}
 
output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[@metadata][fingerprint]}"
  }
}

密钥的其他示例可以是 departmentID,组织 ID 等。

意外重复:从 Logstash 生成 UUID

先前的用例涉及内容的有意识地删除重复数据。在某些部署中,尤其是 Logstash 与可确保至少交付一次的持久性队列或其他排队系统一起使用时,Elasticsearch 中可能存在重复项。如果 Logstash 在处理过程中崩溃,则重新启动时将重播队列中的数据-这可能导致重复。为了减少这种情况造成的重复,可以对每个事件使用 UUID。这里的重点是,在将数据序列化到消息队列之前,需要在生产方(即发布到排队系统的 Logstash 实例)上生成UUID。这样,Logstash使用者在从崩溃还原或重新启动时需要重新处理事件时,将保留相同的事件ID。

如果你的源数据没有唯一标识符,则可以使用同一指纹过滤器来生成 UUID。请记住,此方法不考虑事件本身的内容,而是为每个事件生成 version 4 UUID。

filter {
  fingerprint {
    target => "%{[@metadata][uuid]}"
    method => "UUID"
  }
}
 
output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[@metadata][uuid]}"
  }
}

如果在 Logstash 生产者和使用者之间使用队列,则必须显式复制@metadata字段,因为它们不会持久化到输出中。 另外,你可以使用以下常规字段:

filter {
  fingerprint {
    target => "generated_id"
    method => "UUID"
  }
}
 
output {
  kafka {
    brokers => "example.com"
    ...
  }
}

从消费者方面,您可以只使用:

input {
  kafka {
    brokers => "example.com"
  }
}
 
output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[generated_id]}"
  }
}

例子

在下面,我们用一个实际的例子来展示,这个是如工作的。首先让我们先创建一个叫做 logstash_fingerprint.conf 的 Logstash 配置文件:

logstash_fingerprint.conf

input {
    http {
        id => "data_http_input"
    }
}
 
filter {
    fingerprint {
        source => [ "sensor_id", "date"]
        target => "[@metadata][fingerprint]"
        method => "SHA1"
        key => "liuxg"
        concatenate_sources => true
        base64encode => true
    }
}
 
output {
    stdout {
        codec => rubydebug
    }
 
  elasticsearch {
        manage_template => "false"
        index => "fingerprint"
        hosts => "localhost:9200"
        document_id => "%{[@metadata][fingerprint]}"
    }
}

在这里,我们使用 http input 来收集数据。在这里,我们使用 sensor_id 及 date 这两个字段来生成一个 fingerprint。也就是说,只有这两个字段是一样的,那么无论我们输入多少次数据,那么在 Elasticsearch 中将不会有新的数据生成,因为它们的 ID 都是一样的。 我们启动 Logstash:

sudo ./bin/logstash -f ~/data/fingerprint/logstash_fingerprint.conf 

我们可以在另外一个 console 中打入如下的命令:

curl -XPOST --header "Content-Type:application/json"http://localhost:8080/" -d '{"sensor_id":1, "date": "2015-01-01", "reading":16.24}'

这个时候,我们可以在 Logstash 的 console 中查看到:

image.png

我们在 Kibana 的 Dev Tools 中进行查看:

GET _cat/indices

我们可以看到有一个新的 fingerprint 的索引已经生产了。

2.png

我们查看 fingerprint 的文档数:

GET fingerprint/_count

结果显示:

{
  "count" : 1,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}

我们在另外一个 console 中打入无数次的如下的命令:

image.png

我们发现,只要是 sensor_id 和 date 的值都是一样的,那么 fingerprint 的文档数永远是 1。当然你也可以更新其它字段的值,比如 reading 字段的值为20,那么新的值将会在里面得以体现。这个操作相当于更新的操作。

如果我们改动一下 sensor_id 的值为2,也就是:

curl -XPOST --header "Content-Type:application/json"http://localhost:8080/" -d '{"sensor_id":2, "date": "2015-01-01", "reading":16.24}'

那么我们重新查看 fingerprint 索引的文档数:

GET fingerprint/_count

image.png

上面显示文档的数值为2。也就是说,在索引 fingerprint 中,只要是 sensor_id 及 date 的数值是一样的,那么我们将永远只有一个文档,而且是永远不会重复的。

结论

如你在本文中所看到的,指纹过滤器可以用于多种用途,并且是你应该在Logstash生态系统中熟悉的插件。它可以很方便地让我们保持我们的文档的唯一性,而不招致重复的数据生成。

声明:本文由原文作者“ Elastic 中国社区布道师——刘晓国”授权转载,对未经许可擅自使用者,保留追究其法律责任的权利。

image.png

阿里云Elastic Stack】100%兼容开源ES,独有9大能力,提供免费 X-pack服务(单节点价值$6000)

相关活动


更多折扣活动,请访问阿里云 Elasticsearch 官网

阿里云 Elasticsearch 商业通用版,1核2G ,SSD 20G首月免费
阿里云 Logstash 2核4G首月免费
下载白皮书:Elasticsearch 八大经典场景应用


image.png

image.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
5天前
|
存储 监控 安全
|
25天前
|
存储 缓存 监控
深入解析:Elasticsearch集群性能调优策略与最佳实践
【10月更文挑战第8天】Elasticsearch 是一个分布式的、基于 RESTful 风格的搜索和数据分析引擎,它能够快速地存储、搜索和分析大量数据。随着企业对实时数据处理需求的增长,Elasticsearch 被广泛应用于日志分析、全文搜索、安全信息和事件管理(SIEM)等领域。然而,为了确保 Elasticsearch 集群能够高效运行并满足业务需求,需要进行一系列的性能调优工作。
51 3
|
26天前
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
156 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
4月前
|
存储 人工智能 自然语言处理
阿里云Elasticsearch AI场景语义搜索最佳实践
本文介绍了如何使用阿里云Elasticsearch结合搜索开发工作台搭建AI语义搜索。
17290 68
|
2月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
1月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
68 0
|
3月前
|
存储 缓存 监控
|
3月前
|
消息中间件 监控 Kafka
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
186 3
|
3月前
|
数据采集 人工智能 自然语言处理
阿里云Elasticsearch AI语义搜索:解锁未来搜索新纪元,精准洞察数据背后的故事!
【8月更文挑战第2天】阿里云Elasticsearch AI场景语义搜索最佳实践
185 5
|
3月前
|
机器学习/深度学习 数据采集 缓存
Elasticsearch与机器学习集成的最佳实践
【8月更文第28天】Elasticsearch 提供了强大的搜索和分析能力,而机器学习则能够通过识别模式和预测趋势来增强这些能力。将两者结合可以实现更智能的搜索体验、异常检测等功能。
96 0

相关产品

  • 检索分析服务 Elasticsearch版