logstash 与ElasticSearch:从CSV文件到搜索宝库的导入指南

简介: logstash 与ElasticSearch:从CSV文件到搜索宝库的导入指南

logstash 与ElasticSearch:从CSV文件到搜索宝库的导入指南

使用 logstash 导入数据到 ES 时,由三个步骤组成:input、filter、output。整个导入过程可视为:unix 管道操作,而管道中的每一步操作都是由 "插件" 实现的。使用 ./bin/logstash-plugin list 查看 logstash 已安装的插件。

每个插件的选项都可以在官网查询,先明确是哪一步操作,然后去官方文档看是否有相应的插件是否支持这种操作。比如 output 配置选项:plugins-outputs-elasticsearch-options),其中的 doc_id 选项就支持 指定 docid 写入 ES。在这里,简要说明一些常用的插件,要想了解它们实现的功能可参考官方文档。

  1. mutate 插件 用于字段文本内容处理,比如 字符替换
  2. csv 插件 用于 csv 格式文件导入 ES
  3. convert 插件 用于字段类型转换
  4. date 插件 用于日期类型的字段处理

使用 logstash 导入时,默认以 "message" 标识 每一行数据,并且会生成一些额外的字段,比如 @version、host、@timestamp,如果用不着,这些字段可以去除掉 ,此外,要注意 ES 中的索引的格式 (Mapping 结构),最好是指定自定义的索引模板,保证索引最 "精简"。

另外这里记录一些常用的参数及其作用,更具体的解释可查看官方文档。

  1. sincedb_path 告诉 logstash 记录文件已经处理到哪一行了,从而当 logstash 发生故障重启时,可从故障点处开始导入,避免从头重新导入。
  2. remove_field 删除某些字段

配置文件完成后,执行以下命令./bin/logstash -f csvfile_logstash.conf 即可启动 logstash 执行导入操作。

以下是各种错误解决:
错误一:

ConfigurationError”, :message=>”Expected one of #, input, filter, output at line 1, column 1

如果 配置文件内容是正确的,用 Notepad++ 检查一下文件的编码,确保是:UTF-8 无 BOM 格式编码

解决 SOH 分隔符问题

由于 csv 插件的 separator 选项不支持转义字符,因此无法用\u0001来代表 SOH。如果 csv 文件以 SOH 分隔符 (\u0001) 分割,一种方案是使用 mutate 插件替换,将\u0001替换成逗号。如下所示:

    mutate{
        # 每一行内容默认是message, 将分隔符 \u0001 替换成 逗号
        gsub => [ "message","\u0001","," ]
        # @timestamp 字段是默认生成的, 名称修改成 created
        rename => ["@timestamp", "created"]
    }

但是实际上 logstash6.8.3 是支持按 SOH 分割的。在 Linux shell 下,先按 ctrl+v,再按 ctrl+a,输入的就是 SOH。那么在 vim 中打开配置文件,在 vim 的 insert 模式下,先按 ctrl+v,再按 ctrl+a,将 SOH 作为 csv 插件的 separator 分割符。

    csv {
            # 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)
            separator => ""
            columns => ["topsid", "title"]
            # 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)
            remove_field => ["host", "@timestamp", "@version", "message","path"]
        }

一个将 csv 文件内容导入 ES 的示例配置模板如下:(csv 文件中的每一行以 SOH 作为分割符)

  • logstash input 插件支持多种数据来源,比如 kafka、beats、http、file 等。在这里我们的数据来源是文件,因此采用了 logstash input file 插件。
  • 把数据从文件中读到 logstash 后,可能需要对文件内容 / 格式 进行处理,比如分割、类型转换、日期处理等,这由 logstash filter 插件实现。在这里我们进行了文件的切割和类型转换,因此使用的是 logstash filter csv 插件和 mutate 插件。
  • 处理成我们想要的字段后,接下来就是导入到 ES,那么就需要配置 ES 的地址、索引名称、Mapping 结构信息 (使用指定模板写入),这由 logstash output 插件实现,在这里我们把处理后的数据导入 ES,因此使用的是 logstash output elasticsearch 插件。
input {
  file {
      path => "/data/psj/test/*.csv"
      start_position => "beginning"
      sincedb_path => "/dev/null"
    }
}

filter {
    csv {
            # 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)
            separator => ""
            columns => ["topsid", "title"]
            # 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)
            remove_field => ["host", "@timestamp", "@version", "message","path"]

        }            
    mutate {
    convert => {
        # 类型转换
        "topsid" => "integer"
        "title" => "string"
    }
  }
}

output {
   elasticsearch {
        hosts => "http://http://127.0.0.1:9200"
        index => "chantitletest"
        # 指定 文档的 类型为 "_doc"
        document_type => "_doc"
        # 指定doc id 为topsid字段的值
        document_id => "%{topsid}"
        manage_template => true
        # 使用自定义的模板写入,否则将会以logstash默认模板写入
        template => "/data/services/logstash-6.8.3/config/chantitletpe.json"
        template_overwrite => true
        template_name => "chantitletpe"
       }
    stdout{
        codec => json_lines
    }
}

(也可以采用 logstash filter 插件的 mutate 选项 将 SOH 转换成逗号):

filter {
    mutate{
        # 每一行内容默认是message, 将分隔符 \u0001 替换成 逗号
        gsub => [ "message","\u0001","," ]
        # @timestamp 字段是默认生成的, 名称修改成 created
        rename => ["@timestamp", "created"]
    }
    csv {
        # 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)
            separator => ","
            columns => ["topsid", "title"]
            # 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)
            remove_field => ["host", "@timestamp", "@version", "message","path"]
        }            
    mutate {
    convert => {
        # 类型转换
        "topsid" => "integer"
        "title" => "string"
    }
  }
}

使用的自定义模板如下:

{
  "index_patterns": [
    "chantitle_v1",
    "chantitletest"
  ],
  "settings": {
    "number_of_shards": 3,
    "analysis": {
      "analyzer": {
        "my_hanlp_analyzer": {
          "tokenizer": "my_hanlp"
        },
        "pinyin_analyzer": {
          "tokenizer": "my_pinyin"
        }
      },
      "tokenizer": {
        "my_hanlp": {
          "enable_normalization": "true",
          "type": "hanlp_standard"
        },
        "my_pinyin": {
          "keep_joined_full_pinyin": "true",
          "lowercase": "true",
          "keep_original": "true",
          "remove_duplicated_term": "true",
          "keep_first_letter": "false",
          "keep_separate_first_letter": "false",
          "type": "pinyin",
          "limit_first_letter_length": "16",
          "keep_full_pinyin": "true"
        }
      }
    }
  },
  "mappings": {
    "_doc": {
      "properties": {
        "created": {
          "type": "date",
          "doc_values": false,
          "format": "yyyy-MM-dd HH:mm:ss"
        },
        "title": {
          "type": "text",
          "fields": {
            "pinyin": {
              "type": "text",
              "boost": 10,
              "analyzer": "pinyin_analyzer"
            },
            "raw": {
              "type": "keyword",
              "doc_values": false
            }
          },
          "analyzer": "my_hanlp_analyzer"
        },
        "topsid": {
          "type": "long",
          "doc_values": false
        }
      }
    }
  }
}

上面给了一个 csv 文件导入 ES,这里再给个 txt 文件导入 ES 吧。txt 以逗号分割,每列的内容都在冒号里面,只需要前 4 列内容,一行示例数据如下:

"12345","12345","研讨区","12345","500","xxxx","2008-08-04 22:20:24","0","300","0","5","0","","0","0","","","0","0"

这里采用的是 logstash filter 的 dissect 插件。相比于 grok 插件,它的优点不是采用正规匹配的方式解析数据,速度较快,但不能解析复杂数据。只能够对较为规律的数据进行导入。logstash 配置文件如下:

input {
  file {
      path => "/data/psj/test/*.txt"
      start_position => "beginning"
      # sincedb_path => "/dev/null"
    }
}

filter {
  dissect {
      mapping => {
        # 插件输入的每一行数据默认名称是message,由于每列数据在双引号里面,因此解析前4列数据的写法如下:
        "message" => '"%{topsid}","%{subsid}","%{subtitle}","%{pid}"'
      }
      # 删除自动生成的、用不着的一些字段
      remove_field => ["host", "@timestamp", "@version", "message","path"]
      convert_datatype => {
        # 类型转换
        "topsid" => "int"
        "subsid" => "int"
        "pid" => "int"
    }
    }
}

output {
   elasticsearch {
        hosts => "http://127.0.0.1:9200"
        index => "chansubtitletest"
        document_type => "_doc"
        # 指定doc id 为topsid字段的值
        document_id => "%{subsid}"
        manage_template => true
        # 使用自定义的模板写入,否则将会以logstash默认模板写入
        template => "/data/services/logstash-6.8.3/config/chansubtitle.json"
        template_overwrite => true
        template_name => "chansubtitle"
       }
    stdout{
        codec => json_lines
    }
}

更多优质内容请关注公号:汀丶人工智能;会提供一些相关的资源和优质文章,免费获取阅读。

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
4月前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
4月前
|
存储 Linux iOS开发
Elasticsearch Enterprise 9.1.5 发布 - 分布式搜索和分析引擎
Elasticsearch Enterprise 9.1.5 (macOS, Linux, Windows) - 分布式搜索和分析引擎
405 0
|
5月前
|
JSON 监控 Java
Elasticsearch 分布式搜索与分析引擎技术详解与实践指南
本文档全面介绍 Elasticsearch 分布式搜索与分析引擎的核心概念、架构设计和实践应用。作为基于 Lucene 的分布式搜索引擎,Elasticsearch 提供了近实时的搜索能力、强大的数据分析功能和可扩展的分布式架构。本文将深入探讨其索引机制、查询 DSL、集群管理、性能优化以及与各种应用场景的集成,帮助开发者构建高性能的搜索和分析系统。
429 0
|
9月前
|
存储 安全 Linux
Elasticsearch Enterprise 9.0 发布 - 分布式搜索和分析引擎
Elasticsearch Enterprise 9.0 (macOS, Linux, Windows) - 分布式搜索和分析引擎
415 0
|
9月前
|
存储 Linux iOS开发
Elasticsearch Enterprise 8.18 发布 - 分布式搜索和分析引擎
Elasticsearch Enterprise 8.18 (macOS, Linux, Windows) - 分布式搜索和分析引擎
369 0
|
数据采集 人工智能 运维
从企业级 RAG 到 AI Assistant,阿里云Elasticsearch AI 搜索技术实践
本文介绍了阿里云 Elasticsearch 推出的创新型 AI 搜索方案
787 3
从企业级 RAG 到 AI Assistant,阿里云Elasticsearch AI 搜索技术实践
|
机器学习/深度学习 人工智能 运维
阿里云技术公开课直播预告:基于阿里云 Elasticsearch 构建 AI 搜索和可观测 Chatbot
阿里云技术公开课预告:Elastic和阿里云搜索技术专家将深入解读阿里云Elasticsearch Enterprise版的AI功能及其在实际应用。
652 2
阿里云技术公开课直播预告:基于阿里云 Elasticsearch 构建 AI 搜索和可观测 Chatbot
|
人工智能 自然语言处理 搜索推荐
云端问道12期实操教学-构建基于Elasticsearch的企业级AI搜索应用
本文介绍了构建基于Elasticsearch的企业级AI搜索应用,涵盖了从传统关键词匹配到对话式问答的搜索形态演变。阿里云的AI搜索产品依托自研和开源(如Elasticsearch)引擎,提供高性能检索服务,支持千亿级数据毫秒响应。文章重点描述了AI搜索的三个核心关键点:精准结果、语义理解、高性能引擎,并展示了架构升级和典型应用场景,包括智能问答、电商导购、多模态图书及商品搜索等。通过实验部分,详细演示了如何使用阿里云ES搭建AI语义搜索Demo,涵盖模型创建、Pipeline配置、数据写入与检索测试等步骤,同时介绍了相关的计费模式。
419 3
|
人工智能 算法 API
构建基于 Elasticsearch 的企业级 AI 搜索应用
本文介绍了基于Elasticsearch构建企业级AI搜索应用的方案,重点讲解了RAG(检索增强生成)架构的实现。通过阿里云上的Elasticsearch AI搜索平台,简化了知识库文档抽取、文本切片等复杂流程,并结合稠密和稀疏向量的混合搜索技术,提升了召回和排序的准确性。此外,还探讨了Elastic的向量数据库优化措施及推理API的应用,展示了如何在云端高效实现精准的搜索与推理服务。未来将拓展至多模态数据和知识图谱,进一步提升RAG效果。
482 1
|
数据采集 人工智能 运维
从企业级 RAG 到 AI Assistant,阿里云Elasticsearch AI 搜索技术实践
本文介绍了阿里云 Elasticsearch 推出的创新型 AI 搜索方案。
986 5

热门文章

最新文章