ElasticSearch Bulk 源码解析

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 对于RPC类的调用,我会在后文简单提及,只是endpoint不一样,内部处理逻辑还是一样的。这篇只会讲IndexRequest,其他如DeleteRequest,UpdateRequest之类的,我们暂时不涉及。
本来应该先有这篇文章,后有 如何提高ElasticSearch 索引速度才对。不过当时觉得后面一篇文章会更有实际意义一些,所以先写了后面那篇文章。结果现在这篇文章晚了20多天。
前言
读这篇文章前,建议先看看ElasticSearch Rest/RPC 接口解析,有利于你把握ElasticSearch接受处理请求的脉络。对于RPC类的调用,我会在后文简单提及,只是endpoint不一样,内部处理逻辑还是一样的。这篇只会讲IndexRequest,其他如DeleteRequest,UpdateRequest之类的,我们暂时不涉及。

类处理路径

RestBulkAction -> 
            TransportBulkAction -> 
                       TransportShardBulkAction
其中TransportShardBulkAction比较特殊,有个继承结构:
   TransportShardBulkAction < TransportReplicationAction < TransportAction
主入口是TransportAction,具体的业务逻辑实现分布到子类(TransportReplicationAction)和孙子类(TransportShardBulkAction)里了。
另外,我们也会提及org.elasticsearch.index.engine.Engine相关的东西,从而让大家清楚的了解ES是如何和Lucene关联上的。

RestBulkAction

入口自然是org.elasticsearch.rest.action.bulk.RestBulkAction,一个请求会构建一个BulkRequest对象,BulkRequest.add方法会解析你提交的文本。对于类型为index或者create的(还记得bulk提交的文本格式是啥样子的么?),都会被构建出IndexRequest对象,这些解析后的对象会被放到BulkRequest对象的属性requests里。当然如果是update,delete等则会构建出其他对象,但都会放到requests里。
public class BulkRequest extends ActionRequest<BulkRequest> implements CompositeIndicesRequest {
    //这个就是前面提到的requests
    final List<ActionRequest> requests = new ArrayList<>();  

//这个复杂的方法就是通过http请求参数解析出
//IndexRequest,DeleteRequest,UpdateRequest等然后放到requests里
public BulkRequest add(BytesReference data, 
@Nullable String defaultIndex, 
@Nullable String defaultType, 
@Nullable String defaultRouting, 
@Nullable String[] defaultFields, 
@Nullable Object payload, boolean allowExplicitIndex) throws Exception {
        XContent xContent = XContentFactory.xContent(data);
        int line = 0;
        int from = 0;
        int length = data.length();
        byte marker = xContent.streamSeparator();
        while (true) {
接着通过NodeClient将请求发送到TransportBulkAction类(回忆下之前文章里提到的映射关系,譬如  TransportAction,两层映射关系解析  )。对应的方法如下:
//这里的client其实是NodeClient
client.bulk(bulkRequest, new RestBuilderListener<BulkResponse>(channel) {
TransportBulkAction
看这个类的签名:
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
实现了HandledTransportAction,说明这个类同时也是RPC接口的逻辑处理类。如果你点进HandledTransportAction就能看到ES里经典的messageReceived方法了。这个是题外话
该类对应的入口是:
protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
这里的bulkRequest 就是前面RestBulkAction组装好的。该方法第一步是判断是不是需要自动建索引,如果索引不存在,就自动创建了。
接着通过executeBulk方法进入原来的流程。在该方法中,对bulkRequest.requests 进行了两次for循环。
第一次判定如果是IndexRequest就调用IndexRequest.process方法,主要是为了解析出timestamp,routing,id,parent 等字段。
第二次是为了对数据进行分拣。大致是为了形成这么一种结构:
//这里的BulkItemRequest来源于 IndexRequest等
Map[ShardId, List[BulkItemRequest]]
接着对新形成的这个结构(ShardId -> List[BulkItemRequest])做循环,也就是针对每个ShardId里的数据进行统一处理。有了ShardId,bulkRequest,List[BulkItemRequest]等信息后,统一封装成BulkShardRequest。从名字看就很好理解,就是对属于同一ShardId的数据构建一个新的类似BulkRequest的对象。
接着就到TransportShardBulkAction,TransportReplicationAction,TransportAction 三代人出场了:
//这里的shardBulkAction 是TransportShardBulkAction
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
TransportReplicationAction/TransportShardBulkAction
TransportAction是一个通用的主类,具体逻辑还是其子类来实现。虽然前面提到shardBulkAction是TransportShardBulkAction,但其实流程逻辑还是TransportReplicationAction来完成的。入口在该类的doExecute方法:
@Override
    protected void doExecute(Request request, ActionListener<Response> listener) {
        new PrimaryPhase(request, listener).run();
    }
我们知道在ES里有主从分片的概念,所以一条数据被索引后需要经过两个阶段:
  1. 将数据写入Primary(主分片)
  2. 将数据写入Replication(从分片)
至于为什么不直接从Primary进行复制,而是将数据分别写入到Primary和Replication我觉得主要考虑如果一旦Primary是损坏的,不至于影响到Replication(考虑下,如果Primary是损坏的文件,然后所有的Replication如果是直接复制过来,就都坏了)。
又扯远了。我们看到doExecute 首先是进入PrimaryPhase阶段,也就是写主分片。

Primary Phase

在PrimaryPhase.doRun方法里,你会看到两行代码
final ShardIterator shardIt = shards(observer.observedState(), internalRequest);
final ShardRouting primary = resolvePrimary(shardIt);
其中这个ShardIterator是类似 shardId->ShardGroup 的结构。不管这个shardId是什么,它一定是个Replication或者Primary的shardId, ShardGroup 就是Replication和Primary的集合。resolvePrimary方法则是遍历这个集合,然后找出Primary的过程。
知道Primary后就可以判断是转发到别的Node或者直接在本Node处理了:
routeRequestOrPerformLocally(primary, shardIt);
如果Primary就在本节点,直接就处理了:
//我去掉了一些无关代码哈
if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
                try {
                    threadPool.executor(executor).execute(new AbstractRunnable() {
                         @Override
                        protected void doRun() throws Exception {
                            performOnPrimary(primary, shardsIt);
                        }
            }
这里用上了线程池。前面对每个shardId对应的数据集合做处理,其实是顺序循环执行的,这里实现了将数据处理异步化。
在performOnPrimary方法中,BulkShardRequest被转化成了PrimaryOperationRequest,理由也很简单,更加specific了,因为就是针对主分片的Request。接着进入shardOperationOnPrimary 方法,该方法是在孙子类TransportShardBulkAction类里实现的。
protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(
ClusterState clusterState, 
PrimaryOperationRequest shardRequest) {
到该方法,有两个比较重要的概念会出现:
//伟大的版本号,实现了对并发修改的支持
long[] preVersions = new long[request.items().length];
VersionType[] preVersionTypes = new VersionType[request.items().length];
//事物日志,为Shard Recovery以及
//避免过多的Index Commit做出突出贡献,
//同时也是是实现了GetById的实时性
Translog.Location location = null;
上面两个概念成就了ES从一个简单的全文检索引擎到类No-SQL的转型(好吧,我好像又扯远了)
接着就是for循环了:
//这里的request是BulkShardRequest
//对应的items则是BulkItemRequest集合
for (int requestIndex = 0;
 requestIndex < request.items().length; 
requestIndex++) {
循环会根据BulkItemRequest的不同类型而有了分支。其实就是
IndexRequest,DeleteRequest,UpdateRequest,我们这里依然只讨论IndexRequest。如果发现BulkItemRequest是IndexRequest,进行如下操作:
WriteResult<IndexResponse> result = shardIndexOperation(request, 
indexRequest, 
clusterState, 
indexShard, 
true);
shardIndexOperation里嵌套的核心方法是executeIndexRequestOnPrimary,该方法第一步是获取到Operation对象,
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Engine对象是比较底层的一个对象了,是对Lucene的IndexWriter,Searcher之类的封装。这里的Engine.IndexingOperation对应的是Create或者Index类。你可以把这两个类理解为待索引的Document,只是还带上了动作。
第二步是判断索引的Mapping是不是要动态更新,如果是,则更新。
第三步执行实际的建索引操作:
final boolean created = operation.execute(indexShard);

operation.execute 额外引出的话题

我们会暂时深入到operate.execute方法里,但这个不是主线,看完后记得回到上面那行代码上。
刚才我们说了operation可能是Create或者Index,我们会以Create为主线进行分析。所谓Create和Index,你可以理解为一个待索引的Document,只是带上动作的语义。
上面对应的execute 方法签名是:
@Overridepublic boolean execute(IndexShard shard) {     shard.create(this);   
 return true;
}
我们看到这里是反向调用indexShard对象的create方法来进行索引的创建。我们来看看IndexShard的create方法:
//我依然做了删减,体现一些核心代码
public void create(Engine.Create create) {        
        engine().create(create);
    }
engine()方法返回的是InternalEngine实例,InternalEngine .innerCreate方法执行到构建索引的操作。这个方法值得分析一下,所以我就贴了一坨的代码。
private void innerCreate(Create create) throws IOException {
        if (engineConfig.isOptimizeAutoGenerateId() && create.autoGeneratedId() && !create.canHaveDuplicates()) {
            // We don't need to lock because this ID cannot be concurrently updated:
            innerCreateNoLock(create, Versions.NOT_FOUND, null);
        } else {
            synchronized (dirtyLock(create.uid())) {
                final long currentVersion;
                final VersionValue versionValue;
                versionValue = versionMap.getUnderLock(create.uid().bytes());
                if (versionValue == null) {
                    currentVersion = loadCurrentVersionFromIndex(create.uid());
                } else {
                    if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
                        currentVersion = Versions.NOT_FOUND; // deleted, and GC
                    } else {
                        currentVersion = versionValue.version();
                    }
                }
                innerCreateNoLock(create, currentVersion, versionValue);
            }
        }
    }
首先,如果满足如下三个条件就无需进行版本检查:
  1. index.optimize_auto_generated_id 被设置为true(默认是false,话说注释上说是默认是true,但是我看着觉得像是false)
  2. id设置为自动生成(没有人工设置id)
  3. create.canHaveDuplicates == false ,该参数一般是false
提这个是主要为了说明,譬如一般的运维日志啥的,就不要自己生成ID了,采用自动生成的ID,可以跳过版本检查,从而提高入库的效率。
第二个指的说的是,如果对应文档在缓存中没有找到(versionMap),那么就会由如下的代码执行实际磁盘查询操作:
currentVersion = loadCurrentVersionFromIndex(create.uid());
通过对比create对象里的版本号和从索引文件里加载的版本号 ,最终决定是进行update还是create操作。
在innerCreateNoLock 方法里,你会看到熟悉的Lucene操作,譬如:
indexWriter.addDocument(index.docs().get(0));
//或者
indexWriter.updateDocument(index.uid(), index.docs().get(0));
现在回到TransportShardBulkAction的主线上。执行完下面的代码后:
final boolean created = operation.execute(indexShard);
就能获得对应文档的版本等信息,这些信息会更新对应的IndexRequest等对象。
到目前为止,Primay Phase 完成,接着开始Replication Phase
replicationPhase = new ReplicationPhase(shardsIt, 
primaryResponse.v2(), 
primaryResponse.v1(), 
observer, 
primary, 
internalRequest, 
listener, 
indexShardReference);
finishAndMoveToReplication(replicationPhase);
最后一行代码会启动replicationPhase阶段。

Replication Phase

Replication Phase 流程大致和Primary Phase 相同,就不做过详细的解决,我这里简单提及一下。
ReplicationPhase的doRun方法是入口,核心方法是performOnReplica,如果发现Replication  shardId所属的节点就是自己的话,异步执行shardOperationOnReplica,大体逻辑如下:
threadPool.executor(executor).execute(new AbstractRunnable() {
                        @Override
                        protected void doRun() {
                            try {
                                shardOperationOnReplica(shard.shardId(), replicaRequest);
                                onReplicaSuccess();
                            } catch (Throwable e) {
                                onReplicaFailure(nodeId, e);
                                failReplicaIfNeeded(shard.index(), shard.id(), e);
                            }
                        }
在Replication阶段,shardOperationOnReplica 该方法完成了索引内容解析,mapping动态新增,最后进入索引(和就是前面提到的operation.execute)等动作,所以还是比Primary 阶段更紧凑些。
另外,在Primary Phase 和 Replication Phase, 一个BulkShardRequest 处理完成后(也就是一个Shard 对应的数据集合)才会刷写Translog日志。所以如果发生数据丢失,则可能是多条数据。

总结

这篇文章以流程分析为主,很多细节我们依然没有讲解详细,比如Translog和Version。这些争取能够在后续文章中进一步阐述。另外错误之处在所难免,请大家在评论处提出。
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
86 2
|
9天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
9天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
9天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
57 12
|
29天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
10天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
2月前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
61 3
|
3月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
68 5

推荐镜像

更多