logstash同步父子文档

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: logstash同步父子文档

父子文档,可以理解为关系型数据库中的一对多的关系。使用logstash同步MySQL数据,有时候需要同步父子文档。

使用ES父子文档功能需要满足两个条件

  • 指定一个type是另一个type的父
  • 在存储子文档的时候,通过parent参数指定父文档的Id


第一步:定义父子关系


curl -XPUT "http://localhost:9200/product?pretty" -d '

{
  "mappings": { 
    "service_judge" : { }, 
    "service_judge_detail":{ 
      "_parent": { 
        "type": "service_judge"}}
}


第二步:配置文件


input{
     jdbc {
         jdbc_driver_library => "/some/config-dir/mysql-connector-java-5.1.46-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://localhost:3306/product"
         jdbc_user => "user"
         jdbc_password => "123456"
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000"
         jdbc_default_timezone =>"Asia/Shanghai"
         schedule => "* * * * *"
         statement => "SELECT * FROM  service_judge WHERE `status` = 1"
         type => "service_judge"
       }
       jdbc {
         jdbc_driver_library => "/some/config-dir/mysql-connector-java-5.1.46-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://localhost:3306/product"
         jdbc_user => "user"
         jdbc_password => "123456"
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000"
         jdbc_default_timezone =>"Asia/Shanghai"
         schedule => "* * * * *"
         statement => "SELECT * FROM  service_judge_detail WHERE `status` = 1"
         type => "service_judge_detail"
       } 
} 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
output{
      stdout {
        codec => json_lines
      }
      if[type] == "service_judge"{
      elasticsearch {
         hosts => "localhost:9200"
         user => "elastic"
         password => "123456"
         index => "service_judge"
         document_type => "service_judge"
         document_id => "%{id}"
      }
      }
      if[type] == "service_judge_detail"{
      elasticsearch {
         hosts => "localhost:9200"
         user => "elastic"
         password => "123456"
         index => "service_judge"
         document_type => "service_judge_detail"
         document_id => "%{id}"
         # 这里通过parent指定父文档:参数为子表中存的父表的id
         parent => "%{service_judge_id}"
      }
      }
 } 


查询验证


查询:评价标签带有非常差的评价

GET product/service_judge/_search
{
  "query": {
    "has_child": {
      "type": "service_judge_detail",
      "query": {
        "match": {
          "content": "非常差"
        }
      }
    }
  }
}


结果:


{
  "took": 9,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "service_judge",
        "_type": "service_judge",
        "_id": "2c94bf8362ffe67d016300f3b2ca0001",
        "_score": 1,
        "_source": {
          "@timestamp": "2018-05-07T10:40:00.076Z",
          "service_id": "4028b8815ca0a5ca015ca0a67ad90000",
          "name": "衣柜送货管家评价",
          "@version": "1",
          "memo": "评价33444",
          "addname": "1",
          "id": "2c94bf8362ffe67d016300f3b2ca0001",
          "target_role": "003",
          "type": "service_judge",
          "value": null,
          "status": "1"
        }
      }
    ]
  }
}



相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
10月前
|
数据库 索引
elasticsearch中join类型数据如何进行父子文档查询?
elasticsearch中join类型数据如何进行父子文档查询?
|
28天前
|
消息中间件 存储 Kafka
深入解析Kafka中的动态更新模式
深入解析Kafka中的动态更新模式
20 0
|
8月前
|
关系型数据库 MySQL 数据库
淘东电商项目(42) -利用Logstash自动同步数据库内容到ES(多文件方式)
淘东电商项目(42) -利用Logstash自动同步数据库内容到ES(多文件方式)
48 0
|
12月前
|
存储 JSON 缓存
K8s日志组件-Loki是如何存储数据的?
日志记录本质上是一个事件。大多数语言、应用程序框架或库都支持日志,表现形式可以是字符串这样原始的非结构化数据,也可以是JSON等半结构化数据。开发者可以通过日志来分析应用的执行状况,报错信息,分析性能…… 正因为日志极其灵活,生成非常容易,没有一个统一的结构,所以它的体量也是最大的。
447 0
|
缓存 中间件
【Flume中间件】(8)channel选择器副本机制
【Flume中间件】(8)channel选择器副本机制
173 0
【Flume中间件】(8)channel选择器副本机制
|
存储 监控 中间件
【Flume中间件】(2)实时监听一个文件末尾产生的数据
【Flume中间件】(2)实时监听一个文件末尾产生的数据
95 0
【Flume中间件】(2)实时监听一个文件末尾产生的数据
|
监控 中间件
【Flume中间件】(11)聚合组
【Flume中间件】(11)聚合组
56 0
|
消息中间件 存储 缓存
Kafka概念及组件介绍
1、分布式消息队列系统,先入先出,同时提供数据分布式缓存功能 2、消息持久化:数据读取速度可以达到O(1)——预读,后写(按顺序,ABCDE,正读A,预读B;尾部追加写)对磁盘的顺序访问比内存访问还快)
379 1
Kafka概念及组件介绍
|
算法 Java Nacos
Raft采用日志复制形式同步数据|学习笔记
快速学习Raft采用日志复制形式同步数据
182 0
Raft采用日志复制形式同步数据|学习笔记
|
存储 负载均衡 监控
Elasticsearch文档读写模型实现原理
Elasticsearch文档读写模型实现原理
Elasticsearch文档读写模型实现原理