logstash同步父子文档

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: logstash同步父子文档

父子文档,可以理解为关系型数据库中的一对多的关系。使用logstash同步MySQL数据,有时候需要同步父子文档。

使用ES父子文档功能需要满足两个条件

  • 指定一个type是另一个type的父
  • 在存储子文档的时候,通过parent参数指定父文档的Id


第一步:定义父子关系


curl -XPUT "http://localhost:9200/product?pretty" -d '

{
  "mappings": { 
    "service_judge" : { }, 
    "service_judge_detail":{ 
      "_parent": { 
        "type": "service_judge"}}
}


第二步:配置文件


input{
     jdbc {
         jdbc_driver_library => "/some/config-dir/mysql-connector-java-5.1.46-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://localhost:3306/product"
         jdbc_user => "user"
         jdbc_password => "123456"
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000"
         jdbc_default_timezone =>"Asia/Shanghai"
         schedule => "* * * * *"
         statement => "SELECT * FROM  service_judge WHERE `status` = 1"
         type => "service_judge"
       }
       jdbc {
         jdbc_driver_library => "/some/config-dir/mysql-connector-java-5.1.46-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://localhost:3306/product"
         jdbc_user => "user"
         jdbc_password => "123456"
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000"
         jdbc_default_timezone =>"Asia/Shanghai"
         schedule => "* * * * *"
         statement => "SELECT * FROM  service_judge_detail WHERE `status` = 1"
         type => "service_judge_detail"
       } 
} 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
output{
      stdout {
        codec => json_lines
      }
      if[type] == "service_judge"{
      elasticsearch {
         hosts => "localhost:9200"
         user => "elastic"
         password => "123456"
         index => "service_judge"
         document_type => "service_judge"
         document_id => "%{id}"
      }
      }
      if[type] == "service_judge_detail"{
      elasticsearch {
         hosts => "localhost:9200"
         user => "elastic"
         password => "123456"
         index => "service_judge"
         document_type => "service_judge_detail"
         document_id => "%{id}"
         # 这里通过parent指定父文档:参数为子表中存的父表的id
         parent => "%{service_judge_id}"
      }
      }
 } 


查询验证


查询:评价标签带有非常差的评价

GET product/service_judge/_search
{
  "query": {
    "has_child": {
      "type": "service_judge_detail",
      "query": {
        "match": {
          "content": "非常差"
        }
      }
    }
  }
}


结果:


{
  "took": 9,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "service_judge",
        "_type": "service_judge",
        "_id": "2c94bf8362ffe67d016300f3b2ca0001",
        "_score": 1,
        "_source": {
          "@timestamp": "2018-05-07T10:40:00.076Z",
          "service_id": "4028b8815ca0a5ca015ca0a67ad90000",
          "name": "衣柜送货管家评价",
          "@version": "1",
          "memo": "评价33444",
          "addname": "1",
          "id": "2c94bf8362ffe67d016300f3b2ca0001",
          "target_role": "003",
          "type": "service_judge",
          "value": null,
          "status": "1"
        }
      }
    ]
  }
}



相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
XML Java UED
使用 Spring Boot 实现重试和补偿功能:从理论到实践
【6月更文挑战第17天】在分布式系统中,服务之间的调用可能会因为网络故障、服务器负载等原因偶尔失败。为了提高系统的可靠性和稳定性,我们经常需要实现重试和补偿功能。
422 6
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错之从specified offset启动没有问题,但是停机后从savepoint恢复则报错binlog被purge(实际文件还在),如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
JSON API 数据格式
基于若依的ruoyi-nbcio流程管理系统增加仿钉钉流程设计(一)
基于若依的ruoyi-nbcio流程管理系统增加仿钉钉流程设计(一)
256 1
|
存储 索引
Elasticsearch索引之嵌套类型:深度剖析与实战应用
Elasticsearch索引之嵌套类型:深度剖析与实战应用
|
JSON 监控 Java
Java中如何解决JsonProcessingException异常?
Java中如何解决JsonProcessingException异常?
|
存储 消息中间件 Kafka
logstash简明实用教程
Logstash 是开源的服务器端**数据处理管道**,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。 官方介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。 Logstash 能够动态地采集、转换和传输数
832 0
logstash简明实用教程
|
分布式计算 MaxCompute
DataHub常见问题之同步篇
介绍DataHub同步的常见问题
4508 0
DataHub常见问题之同步篇
|
缓存 Java Maven
SpringBoot应用篇@Value配置自动刷新能力扩展实践| 8月更文挑战
在我们的日常开发中,使用@Value来绑定配置属于非常常见的基础操作,但是这个配置注入是一次性的,简单来说就是配置一旦赋值,则不会再修改; 通常来讲,这个并没有什么问题,基础的 SpringBoot 项目的配置也基本不存在配置变更,如果有使用过 SpringCloudConfig 的小伙伴,会知道@Value可以绑定远程配置,并支持动态刷新 接下来本文将通过一个实例来演示下,如何让@Value注解支持配置刷新;本文将涉及到以下知识点
1700 1
SpringBoot应用篇@Value配置自动刷新能力扩展实践| 8月更文挑战
|
Java 测试技术
一款自动生成单元测试的 IDEA 插件,开发效率提升 70% 以上!
一款自动生成单元测试的 IDEA 插件,开发效率提升 70% 以上!
1022 0
一款自动生成单元测试的 IDEA 插件,开发效率提升 70% 以上!