Elasticsearch全量数据增量遍历实现原理

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 针对凤凰网财经版块的新闻数据和评论数据, 1个索引存储采集凤凰网财经版块的新闻数据;1个索引存储相关的财经数据评论结果。

0、需求

针对凤凰网财经版块的新闻数据和评论数据, 1个索引存储采集凤凰网财经版块的新闻数据;1个索引存储相关的财经数据评论结果。
统计:

1)某条新闻的评论数的多少?
2)某条评论属于哪条新闻?
3)当前已采集数据的所有评论、评论数汇总,按照评论数逆序排序,以便于图形化展示。

1、问题分解

1.1 数据如何存储,方案选型?

方案一:类似需求,1个索引ifeng_index存储新闻数据;1个索引ifeng_comm_index存储评论数据。
二者之间通过唯一值建立关联:评论数据中其来源新闻的唯一id值。
优点:数据分开存储,不存在交叉问题;
缺点:如果实现需求3),会非常复杂,做全局两通道的遍历和统计。

方案二:借助2.X-5.X版本ES中特有的父子文档实现。
注意:由于6.X版本以后,一个索引下只能存储一个type,所以父子文档也就不再可以使用。
所谓父子文档,可以理解为:
1)统一索引下的两个type,1个父type存储凤凰网新闻数据;1个子type存储凤凰网评论数据;
2)其中子type的Mapping定义要特殊处理;
3)其中子type的每条凤凰网评论数据都要关联唯一的父type的相关ID值。
优点:可以以相对较小的复杂度满足需求1)、2)、3)
缺点:以上方案二,ES6.X+都不再适用。

注意:
ES6.X已经移除父子文档的相关实现:http://t.cn/RE07V5A
但是转换为新的join的实现方式:http://t.cn/RE07IG1
综上分析,我是5.X的版本,采取方案二。

1.2 所需要技术支撑

1)父子文档技术
父子文档定义:
用于两个索引之间通过某一个特殊字段建立关联的场景。
在1对多的场景,尤其适用。如:1个父文档下有多个子文档。
缺点: 查询速度会比同等的嵌套查询慢5到10倍,详见:http://t.cn/ROir5rQ

父子文档实现:
建立父-子文档映射关系时只需要指定某一个文档 type 是另一个文档 type 的父亲。 该关系可以在如下两个时间点设置:1)创建索引时;2)在子文档 type 创建之前更新父文档的 mapping。

2)全局遍历技术
借助Scroll实现。之前的博文也有说明,http://t.cn/RE068mD
【scroll机制】:相对于from和size的分页来说,使用scroll可以模拟一个传统数据的游标,记录当前读取的文档信息位置。这个分页的用法,不是为了实时查询数据,而是为了一次性查询大量的数据(甚至是全部的数据)。
假设某索引下共有8个document,document中由code字段标记不同,0,1,2….8区分。

第一次scroll请求中,size设置为3,基于code升序排序,时间戳设置为1min;
第二次基于scroll_id查询,基于scroll_id查询第一次返回结果:0,1,2; 基于scroll_id查询第二次,返回结果:3,4,5; 基于scroll_id查询第三次,返回结果:6,7。
假设仍在的有效时间1min内,继续查询,返回结果:空。
若超时,继续scroll_id请求,则会返回类似如下的错误:

{
  "error": {
  "root_cause": [
  {
  "type": "search_context_missing_exception",
  "reason": "No search context found for id [79110689]"
  },
  {
  "type": "circuit_breaking_exception",
  "reason": "[parent] Data too large, data for [<transport_request>] would be [23195655038/21.6gb], which is larger than the limit of [23190280601/21.5gb]",
  "bytes_wanted": 23195655038,
  "bytes_limit": 23190280601
  },

3)增量遍历统计计数
对修改字段打flag标记,可以通过ES中update_by_query方法,对ES中数据进行更新操作。
如果:不存在flag字段,遍历到该条记录的时候,新增flag字段且flag置为1。
如果:存在flag字段,代表该条记录已经被遍历过。
这样的好处,防止数据被循环遍历。
其实:scroll机制已经预防了这一点,以防万一。

2、具体原理

2.1 scroll实现遍历DSL实现

步骤1:scroll查询。
基于特定的字段进行排序如下:

POST scroll_index/_search?scroll=1m
{
  "size": 3,
  "query": {
  "match_all": {}
  },
  "sort": {
  "code":"asc"
  }
}

返回结果:

{
  "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAFcOGqFllFRGNIdVQ2Ui1LaFJRblJKOWVDNkEAAAAABXDhqxZZRURjSHVUNlItS2hSUW5SSjllQzZBAAAAAAPXB3EWTllZcVhfUlNSRUN5M3NueUJ2YkVXUQAAAAAD1wd0Fk5ZWXFYX1JTUkVDeTNzbnlCdmJFV1EAAAAAA9cHchZOWVlxWF9SU1JFQ3kzc255QnZiRVdR",
  "took": 4,
  "timed_out": false,
  "_shards": {
  "total": 5,
  "successful": 5,
  "failed": 0
  },
  "hits": {
  "total": 8,
  "max_score": null,
  "hits": [
  {
  "_index": "scroll_index",
  "_type": "scroll_type",
  "_id": "0",
  "_score": null,
  "_source": {
  "code": 0
  },
  "sort": [
  0
  ]
  },
  {
  "_index": "scroll_index",
  "_type": "scroll_type",
  "_id": "1",
  "_score": null,
  "_source": {
  "code": 1
  },
  "sort": [
  1
  ]
  },
  {
  "_index": "scroll_index",
  "_type": "scroll_type",
  "_id": "2",
  "_score": null,
  "_source": {
  "code": 2
  },
  "sort": [
  2
  ]
  }
  ]
  }
}

步骤2:基于Scroll_id查询

POST /_search/scroll
{
  "scroll" : "1m",
  "scroll_id" : "DnF1ZXJ5VGhlbkZldGNoBQAAAAAFcOGqFllFRGNIdVQ2Ui1LaFJRblJKOWVDNkEAAAAABXDhqxZZRURjSHVUNlItS2hSUW5SSjllQzZBAAAAAAPXB3EWTllZcVhfUlNSRUN5M3NueUJ2YkVXUQAAAAAD1wd0Fk5ZWXFYX1JTUkVDeTNzbnlCdmJFV1EAAAAAA9cHchZOWVlxWF9SU1JFQ3kzc255QnZiRVdR"
}

步骤3:循环执行,直至遍历结束。
主要注意的时间。

2.3 增量部分java如何实现?

在延迟指定时间后以指定的间隔时间循环执行定时任务。
借助Java Timer类实现。 Timer 是一种定时器工具,用来在一个后台线程计划执行指定任务。

  public class TimerTest03 {
        Timer timer;

        public TimerTest03(){
            timer = new Timer();
            timer.schedule(new TimerTaskTest03(), 1000, 2000);
        }

        public static void main(String[] args) {
            new TimerTest03();
        }
    }

    public class TimerTaskTest03 extends TimerTask{

        @Override
        public void run() {
            Date date = new Date(this.scheduledExecutionTime());
            System.out.println("本次执行该线程的时间为:" + date);
        }
    }

2.5 父子文档的实现。

步骤1:定义索引,同常规定义索引一致。
但,要规划好父子文档,通过:type类型区分。
如,父文档的type定义为:fenghuang_type, 子文档定义为ifeng_comm_type。

PUT ifeng_index
{
  "mappings": {
    "ifeng_type": {},
    " ifeng_comm_type ": {
      "_parent": {
        "type": "fenghuang_type"
      }
    }
  }
}

步骤2:插入子文档数据。
父文档数据插入和传统导入数据方式一致。
举例如下:

PUT /ifeng_index/ifeng_comm_type/1?parent=XVFBASDE! 

注意: 父子文档关联建立的关键是:在导入子文档数据的时候通过加入参数parent=父文档的_id。

步骤3:基于父文档查询子文档。

POST ifeng_index/ifeng_comm_type/_search
{
  "query": {
    "has_parent": {
      "type":  "ifeng_type",
      "query": {
        "match": {
          "title": "世锦赛-墨西哥锦标赛移动日精彩集锦"
        }
      }
    }
  }
}

步骤4:基于子文档查询父文档。

POST ifeng_index/ifeng_type/_search
{
  "query": {
    "has_child": {
      "type":  "ifeng_comm_type",
      "score_mode": "max",
      "query": {
        "match": {
          "title": "很精彩,棒极啦!"
        }
      }
    }
  }
}

步骤5: 查看索引中每个父文档下有多少个子文档

POST /ifeng_index/ifeng_type/
{
  "size": 0,
  "aggs": {
  "ifeng_key_agg": {
  "terms": {
  "field": "_key",
  "order": {
  "ifeng_comm_type": "desc"
  }
  },
  "aggs": {
  "ifeng_comm_type": {
  "children": {
  "type": "ifeng_comm_type"
  }
  }
  }
  }
  }
}

3、小结

通过父子文档,实现了1对多数据的关联;
通过scroll可以实现遍历操作,实现全量遍历;
通过scroll+java定时任务Timer,实现增量遍历。


作者:铭毅天下
转载请标明出处,原文地址:
http://blog.csdn.net/laoyang360/article/details/79437408

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
19天前
|
存储 监控 数据挖掘
使用 Meltano 将数据从 Snowflake 导入到 Elasticsearch:开发者之旅
【6月更文挑战第9天】Meltano,一个开源数据集成框架,简化了从Snowflake到Elasticsearch的数据迁移。这个工具支持多种数据源,提供易于配置的界面。要开始,需安装Meltano并配置连接信息。一个简单的YAML示例展示了如何定义从Snowflake到Elasticsearch的迁移任务。Meltano自动执行迁移,同时提供监控和日志功能。借助Meltano,用户能高效集成数据,提升搜索和分析能力,适应不断增长的数据需求和挑战。
66 6
|
17天前
|
JSON DataWorks 关系型数据库
DataWorks操作报错合集之同步Elasticsearch数据报错:Cat response did not contain a JSON Array,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
1天前
|
存储 Java API
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧———索引与数据上传(二)
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧———索引与数据上传(二)
|
30天前
|
运维 数据挖掘 Serverless
阿里云Elasticsearch Serverless助力某电商平台公司实现商品订单数据的实时写入查询
某电商平台公司采用阿里云Elasticsearch Serverless解决方案,实现商品、订单和其他关键信息的写入和查询的实时响应。
230 1
|
14天前
|
存储 负载均衡 NoSQL
【后端面经】【NoSQL】ElasticSearch - 1 - 1 节点角色 写入数据
【6月更文挑战第12天】中间件高可用与高性能通过冗余、负载均衡和优化实现。Elasticsearch节点有候选主节点、协调节点和数据节点等角色,可兼任或独立。数据写入涉及Buffer、Page Cache和磁盘,通过段合并优化资源使用。查询通过Commit Point识别新段,确保近实时搜索。
21 0
|
1月前
|
消息中间件 存储 关系型数据库
【微服务】mysql + elasticsearch数据双写设计与实现
【微服务】mysql + elasticsearch数据双写设计与实现
|
1月前
|
canal NoSQL 关系型数据库
实时计算 Flink版产品使用合集之如何在ElasticSearch中查看同步的数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
Java Maven 开发工具
【ElasticSearch 】IK 分词器安装
【ElasticSearch 】IK 分词器安装
44 1
|
15天前
Elasticsearch安装配置文件
Elasticsearch安装配置文件
15 0
|
1月前
|
存储 数据可视化 数据挖掘
【ElasticSearch】ElasticSearch安装
【ElasticSearch】ElasticSearch安装
38 2