你想拥有自己的搜索引擎吗?-elasticsearch篇

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 分布式系统的集群方式大致可以分为主从(Master-Slave)模式和无主模式。elasticsearch、HDFS、HBase使用主从模式,Cassandra使用无主模式。主从模式可以简化系统设计,Master作为权威节点,部分操作由Master执行,并负责维护集群元信息。缺点是Master节点存在单点故障,需要解决容灾问题,并且集群规模会受限于Master节点的管理功能。从集群节点角色的角度划分,至少存在主节点和数据节点,另外还有协调节点,预处理节点和部落节点。

背景

现有某大型政务系统,采用微服务架构,目前正在运行的节点数约有150个,为了更好的监控系统内各节点的运行情况,就需要搜集每个节点的运行日志,目前每天产生的日志量约为1G,如此庞大的数据量,使用传统的数据库来存储显然是不现实的,现我们需要一个可以具有大存储、高并发、可拓展、秒级查询能力的高可用的分布式日志系统。普元技术中台的日志模块基于分布式搜索引擎elasticsearch实现了大数据量日志的存储、查询和管理等功能,elasticsearch的分布式架构特性为我们构建分布式日志系统提供了基础保障。下面就让我们来聊聊elasticsearch的分布式架构吧。

1. elasticsearch集群基本概念和原理

1.1 集群节点角色

主节点(Master node)

主节点负责集群层面上的操作,集群管理等,通过配置node.mastertrue(默认) 使这个节点具有Master节点的资格,主节点是全局唯一的,是在具有Master节点资格中选举。

主节点也可以作为数据节点,但是尽量不要这么做,生成环境中,尽量分离主节点和数据节点,创建独立的主节点配置如下:

node.master: true

node.data: false

数据节点(Data node)

负责保存数据、执行CRUD、搜索、聚合等操作。数据节点对I/O、内存、CPU要求较高。一般情况下数据读写只和Data node交互,不和Master node交互


node.master: false

node.data: true

node.ingest: false

协调节点(Coordinating node)

客户端请求可以发送到集群的任何节点,每个节点知道任意文档所处的位置,然后转发这些请求,收集数据并且返回给客户端,处理客户端请求的节点称为协调节点。

协调节点将请求转发给数据节点。每个数据节点在本地执行请求,并将结果返回给协调节点。协调节点将每个数据节点返回的结果合并,对结果进行排序,这个排序过程需要很多CPU和内存资源


node.master: false

node.data: false

node.ingest: false

预处理节点(Ingest node)

es写入数据之前,通过预定义好的processorspipeline对数据进行过滤、转换。processorspipeline拦截bulkindex请求,在应用相关操作后将文档传回indexbulk api;默认情况下节点上启用ingest,如果想在某个节点上关闭ingest,则设置node.ingest: false


node.master: false

node.data: false

node.ingest: false

1.2 集群健康状态

从数据完成性角度划分,集群健康状态分为三种:

· Green 所有的主分片和副分片都正常运行

· Yellow 所有的主分片都正常运行,但是副分片不全是正常运行的,意味着存在单节点故障的风险

· Red 有主分片没有正常运行

每个索引也有这三种状态,假如丢失了一个副分片,该分片所属的索引和整个集群变为Yellow状态,其他的索引为Green

1.3 主要内部模块简介

Cluster

Cluster模块是主节点执行集群管理的封装实现,管理集群状态,维护集群层面的集群信息。

主要功能:

· 管理集群状态,将新生成的集群状态发布到集群所有节点。

· 在集群各个节点中直接迁移分片,保持数据平衡。

· 调用allocation模块执行分片分配,决策哪些分片应该分配到哪个节点

Discovery

发现模块负责发现集群中的节点,以及选举主节点。当节点加入或退出集群,主节点会采取相应的行动。从某种角度来说,发现模块起到类似Zookeeper的作用,选主并管理集群拓扑。

Indices

索引模块管理全局级的索引设置,不包括索引级的(索引设置分为全局级和每个索引级),它还封装了索引数据恢复功能。集群启动阶段需要的主分片恢复和副分片恢复就是这个模块实现的

gateway

负责对收到Master 广播下的clutser state 数据的持久化存储,并且在集群重启时恢复它们。

allocation

封装了分片分配相关的功能和策略,包括主分片的分配和副分片的分配,本模块主要是节点的调用,创建新索引、集群完全重启都需要分片分配的过程。

HTTP

http模块允许通过JSON over HTTP的方式访问ES APIHTTP模块的本质是完全异步,这意味着没有阻塞线程等待响应,使用异步通信进行HTTP的好处是解决10K量级的并发连接

Transport

Transport模块用于集群内节点之间的内部通信。从一个节点到另外一个节点每个内部请求都使用Transport模块。

如同HTTP模块,Transport模块本质也是完全异步的,Transport模块使用的是TCP通信,每个节点都与其他的节点维持若干TCP长连接,内部节点间的通信都是Transport模块承载的。

软件设计经常依赖于抽象,IoC就是很好的实现方式,并且在内部实现了对象的创建和管理,ES使用的Guice框架进行模块化管理。GuiceGoogle开的的轻量级依赖注入框架。

1.4 模块结构

Guice框架下,模块是由ServiceModule类组成的,Service实现业务功能,Module类配置绑定信息。

AbstractModuleGuice提供的基类,模块需要从这个类继承。Module类主要作用用于定义绑定关系,例如:


 protectedvoidconfigure() {

       bind(GatewayAllocator.class).asEagerSingleton();

       bind(AllocationService.class).toInstance(allocationService);

       bind(ClusterService.class).toInstance(clusterService);

       bind(NodeConnectionsService.class).asEagerSingleton();

       bind(MetaDataCreateIndexService.class).asEagerSingleton();

       bind(MetaDataDeleteIndexService.class).asEagerSingleton();

       bind(MetaDataIndexStateService.class).asEagerSingleton();

       bind(MetaDataMappingService.class).asEagerSingleton();

       bind(MetaDataIndexAliasesService.class).asEagerSingleton();

       bind(MetaDataUpdateSettingsService.class).asEagerSingleton();

       bind(MetaDataIndexTemplateService.class).asEagerSingleton();

       bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver);

       bind(RoutingService.class).asEagerSingleton();

       bind(DelayedAllocationService.class).asEagerSingleton();

       bind(ShardStateAction.class).asEagerSingleton();

       bind(NodeMappingRefreshAction.class).asEagerSingleton();

       bind(MappingUpdatedAction.class).asEagerSingleton();

       bind(TaskResultsService.class).asEagerSingleton();

       bind(AllocationDeciders.class).toInstance(allocationDeciders);

       bind(ShardsAllocator.class).toInstance(shardsAllocator);

   }

1.5 模块管理

定义好模块由ModulesBuilder类统一管理,ModulesBuilderESGuice封装,内部调用Guice接口,主要对外提供两个方法。

· add方法:添加创建好的模块

· createInjector方法:调用Guice.createInjector创建返回Injector,后续调用Injector获取相应的Service类的实例。

使用ModulesBuilder进行管理的代理示例:


ModulesBuildermodules=newModulesBuilder();

//Cluster模块

ClusterModuleclusterModule=newClusterModule();

modules.add(clusterModule);

//....

//创建Injector

Injectorinjector=modules.createInjector();

setGatewayAllocator(injector.getInstance(GatewayAllocator.class))

模块化的封装让ES易于扩展,插件本身也是模块,节点启动时被模块管理器添加进来。

2. 集群启动流程

集群启动期间要经历选举主节点、主分片、数据恢复等阶段,梳理其中的原理和细节,对于解决或避免集群遇到问题如脑裂、无主、恢复慢、丢数据有很大的作用。

 

2.1 选取主节点

ES集群启动首先从活跃的机器列表中选取一个作为主节点,选主之后的流程由主节点触发。ES的选主算法是基于Bully算法,主要是对节点的ID进行排序,取ID值最大的节点作为Master,每个节点都运行这个流程。选主的目的是确定唯一的主节点。

节点ID排序选举算法的条件:

1)参选数过半,达到quorum(多数)后选出临时的主节点。

2)得票数需要过半。某个节点被选为主节点,必须判断加入它的节点数过半,才确认Master身份。

3)当检查到节点离开是,需要判断当前节点是否过半。如果达不到quorum,则放弃Master,重新加入集群。如果不这么做容易产生双主,俗称脑裂。

集群不知道共有多少节点,quorum从配置中读取,设置配置项:


discovery.zen.minimun_master_node

2.2 选取集群元信息

被选出的master和集群元数据信息的新旧程度没有关系。它的第一任务是选举元信息,让各节点把各自的存储的元信息发送过来,根据_version版本号来确定最新的元信息,然后把信息broadcast下去,这样集群的所有节点都有最新的元信息。

集群元信息的选举包括两个级别:集群级和索引级。不包含哪个shard存在哪个node节点这种信息。信息以节点磁盘存储为准,需要上报。读写流程不经过master节点,master不知道各个shard副本直接的数据差异。HDFS也是类似的机制,block块信息依赖于DataNode的上报。

为了集群的一致性,参与选举的元信息数量需要过半,Master发布集群状态成功的规则也是等待发布成功的节点数过半。

在选举过程中,不接受新节点的加入请求,集群元信息选举完毕后,Master发布首次集群状态,然后开始选举shard级的元信息。

2.3 allocation过程

选举shard级元信息,构建路由表信息,是在allocation模块中完成。初始阶段,所有的shard都处于unassigned状态。ES中通过分配过程决定哪个分片位于哪个节点,重构内容路由表。此时,首先要做的是分配主分片。

1.选主shard

某个主分片(sent[0])是如何分配的?

首先分配工作都是Master来做的,此时Master不知道主分片在哪,它向集群的所有节点询问:sent[0]分片元信息发送给我,master等待所有请求返回,正常情况下就有了shard的信息,然后根据某种策略选出一个分片为主分片。这种询问量=shard X 节点数。所有我们需要控制shard分片数别太大。

考虑哪个分片作为主分片?

ES5.x以下版本通过对比shard级元信息的版本号来确定。

ES5.x以后开始使用给每个shard设置一个UUID,然后在集群级的元信息中记录哪个shard是最新的,主分片选举过程是通过集群级元信息中记录最新主分片列表来确定主分片的。

如果集群设置了:


cluster.routing.allocation.enable: none

禁止分配分片,集群仍会强制分配分片,设置改选项,集群重启后状态为Yellow,而非Red

2.选副shard

主分片选举完成后,从上一个过程汇总的shard信息中选择一个副本作为副分片。如果汇总信息不存在,则分配一个全新副本的操作依赖于延迟配置项:


index.unassigned.node_left.delayed_timeout

2.4 index recovery

分片分配成功后进入recovery流程。主shardrecovery不会等待其副分片分配成功才开始recovery,它是独立的流程,只是副分片的recovery需要主分片恢复完毕才开始。

为什么需要recovery

对于主分片来说,可能一些数据没有flush;对于副分片来说,一是没有flush,二是主分片写完了,副分片还没的及写,造成主副分片数据不一致。

1.shard recovery

由于每次写操作都会记录translog,事务日志中记录了什么操作和相关的数据。因此将最后一次提交(Lucene的一次提交就是一次fsync刷盘的过程)之后的translog中,建立Lucene索引,如此完成主分片的recovery

2.shard recovery

阶段1:在主分片所在的节点上,获取translog保留锁,从获取保留锁开始,会保留translog不受其刷盘清空的影响。然后调用Lucene接口把shard做快照,这是已经刷盘中的分片数据。把这些shard数据复制到副本节点。在阶段1完毕前,向副本分片发送告知对方启动engine,在阶段2开始之前,副分片就可以正常处理写请求。

阶段2:对tanslog做快照,这个快照包含阶段1开始,到执行translog快照期间的新增索引。将这些translog发送到副分片所在的节点进行重写。

3. 选主流程

3.1 为什么使用主从模式

除了主从模式外,另外一种选择的是分布式哈希模式,可以支持每小时数千个节点的加入和离开,其可以在不了解底层网络拓扑的异构网络中工作,查询响应时间为410跳(中转次数),例如Cassandra就使用这种方案。但是在相稳定的对等网络中,主从模式会更好。

ES的典型应用场景的另一个简化是集群中没有那么多节点。通常,节点的数量远远小于单个节点能够维护的连接数,并且网络环境不必经常处理节点的加入和离开,这就是为什么主从模式更适合ES

3.2 流程分析

整体流程可以概括:选举临时的Master,如果本节点当选,则等待确立Master,如果其他的节点当选,则尝试加入集群,然后启动节点失效侦察。具体如下图所示:

文章2.png

临时Master选举过程如下:

1ping 所有的节点,获取节点列表fullPingResponsesping结果不包括本节点

2)构建两个列表。

activeMasters列表:存储集群当前活跃Master列表。遍历第一步获取的所有节点,将每个节点所认为的当前Master节点加入activeMasters列表中(不包括本节点)。在遍历过程中,如果配置了discovery.zen.master_election.ingore_non_master_pingstrue(默认为false),而节点又不具备Master资格,则跳过该节点。具体流程如下图:

文章3.png

masterCandidates列表:存储master候选者列表。遍历第一步获取列表,去掉不具备Master资格的节点,添加到这个列表中。

(3)如果activeMasters为空,则从masterCandidates中选举,结果可能选举成功,也可能选举失败。如果不为空,则activeMasters中选择最合适的作为Master。流程图如下:

文章4.png

masterCandidates中选主具体细节封装在ElectMasterService类中

//MasterCandidate中选主时,首先判断当前候选数是否达到法定数,否则选主失败

publicbooleanhasEnoughCandidates(Collection<MasterCandidate>candidates) {

       //候选者为空,返回失败

       if (candidates.isEmpty()) {

           returnfalse;

       }

       //默认值为-1 确保单节点的集群可以正常选主

       if (minimumMasterNodes<1) {

           returntrue;

       }

       returncandidates.size() >=minimumMasterNodes;

   }


//当候选数达到额定数后,从候选者中选一个作为Master

publicMasterCandidateelectMaster(Collection<MasterCandidate>candidates) {

       asserthasEnoughCandidates(candidates);

       List<MasterCandidate>sortedCandidates=newArrayList<>(candidates);

       //通过自定义的比较函数对候选者节点从小到大排序

       sortedCandidates.sort(MasterCandidate::compare);

       //返回最新的最为Master

       returnsortedCandidates.get(0);

}

4. 网关模块

gateway模块负责集群元信息的存储和集群重启时的恢复。

ES中存储的数据有下面几种:

· state元数据信息

· index Lucene生成的索引文件

· translog事务日志

分别对应ES中的数据结构:

· MetaData(集群层),主要是ClusterUUIDsettingstemplates

· IndexMetaData(索引层),主要是numberOfShardsmappings

· ShardStateMetaData(分片层),主要是versionindexUUIDprimary等。

持久化的state不包括某个分片存在于哪个节点这种内容路由信息,集群完全重启时,依靠gatewayrecovery过程重建RoutingTable。当读取某个文档时,根据路由算法确定目的的分片后,从RoutingTable中查找分片位于哪个节点,然后将请求转发到目的节点。

4.1 元数据持久化

只有具备Master资格的节点和Data Node可以持久化集群状态。当收到主节点发布的集群状态时,节点判断元信息是否发生变化,如果发生变化则将其持久到磁盘中。

GatewayMetaState负责收集集群状态,当收到新的集群状态时,ClusterApplierService通知全部的applier应用该集群状态:


 privatevoidcallClusterStateAppliers(ClusterChangedEventclusterChangedEvent){

//遍历全部的applier,依次调用各模块对集群状态的处理        

    clusterStateAppliers.forEach(applier-> {

           try {

//调用各个模块实现的applyClusterState

               applier.applyClusterState(clusterChangedEvent);

           } catch (Exceptionex) {

               logger.warn("failed to notify ClusterStateApplier", ex);

           }

       });

   }

执行文件写入的过程封装在MetaDataStateFormat中,全局元信息和索引级元信息的写入都执行三个流程:写临时文件、刷盘、move成目标文件。


 publicfinalvoidwrite(finalTstate, finalPath... locations) throwsIOException {

       if (locations==null) {

           thrownewIllegalArgumentException("Locations must not be null");

       }

       if (locations.length<=0) {

           thrownewIllegalArgumentException("One or more locations required");

       }

       finallongmaxStateId=findMaxStateId(prefix, locations)+1;

       assertmaxStateId>=0 : "maxStateId must be positive but was: ["+maxStateId+"]";

       finalStringfileName=prefix+maxStateId+STATE_FILE_EXTENSION;

       PathstateLocation=locations[0].resolve(STATE_DIR_NAME);

       Files.createDirectories(stateLocation);

       finalPathtmpStatePath=stateLocation.resolve(fileName+".tmp");

       finalPathfinalStatePath=stateLocation.resolve(fileName);

       try {

           finalStringresourceDesc="MetaDataStateFormat.write(path=\""+tmpStatePath+"\")";

           try (OutputStreamIndexOutputout=

                    newOutputStreamIndexOutput(resourceDesc, fileName, Files.newOutputStream(tmpStatePath), BUFFER_SIZE)) {

               CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION);

               out.writeInt(format.index());

               try (XContentBuilderbuilder=newXContentBuilder(format, newIndexOutputOutputStream(out) {

                   @Override

                   publicvoidclose() throwsIOException {

                   } })) {


                   builder.startObject();

                   {

                       toXContent(builder, state);

                   }

                   builder.endObject();

               }

               CodecUtil.writeFooter(out);

           }

           IOUtils.fsync(tmpStatePath, false); // fsync the state file

           Files.move(tmpStatePath, finalStatePath, StandardCopyOption.ATOMIC_MOVE);

           IOUtils.fsync(stateLocation, true);

           for (inti=1; i<locations.length; i++) {

               stateLocation=locations[i].resolve(STATE_DIR_NAME);

               Files.createDirectories(stateLocation);

               PathtmpPath=stateLocation.resolve(fileName+".tmp");

               PathfinalPath=stateLocation.resolve(fileName);

               try {

                   Files.copy(finalStatePath, tmpPath);

                   // we are on the same FileSystem / Partition here we can do an atomic move

                   Files.move(tmpPath, finalPath, StandardCopyOption.ATOMIC_MOVE);

                   IOUtils.fsync(stateLocation, true); // we just fsync the dir here..

               } finally {

                   Files.deleteIfExists(tmpPath);

               }

           }

       } finally {

           Files.deleteIfExists(tmpStatePath);

       }

       cleanupOldFiles(prefix, fileName, locations);

   }          

4.2 元数据恢复

Gatewayrecovery负责找到正确的元数据,应用到集群。

当前集群完成重启,达到recovery条件时,进入元数据恢复流程,一般情况下,recovery条件由以下三个配置控制。

· gateway.expected_nodes,预期的节点数。加入集群的节点数(数据节点或具备Master资格的节点)达到这个数量后立即开始gateway的恢复。默认为0

· gateway.recover_after_time,如果没有达到预期的节点数量,则恢复过程将等待配置的时间,再尝试恢复,默认为5分钟

· gateway.recover_after_nodes,只要配置数量的节点(数据节点或具备Master资格的节点)加入集群就可以开始恢复

当集群级、索引级元数据选举完毕后,执行submitStateUpdateTask提交一个source的任务,触发获取shard级元数据的操作,这个Fetch过程是异步的,根据集群分片数量规模,Fetch过程可能比较长,然后submit任务就结束,gateway流程结束。

4.3 选举集群级和索引级元数据

进入recovery主要流程:代码实现GateWay#performStateRecovery中;首先向Master资格的节点发起请求,获取他们的存储的元数据


//具有Master资格的节点列表

String[] nodesIds=clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);

//发送获取请求并等待结果

       TransportNodesListGatewayMetaState.NodesGatewayMetaStatenodesState=listGatewayMetaState.list(nodesIds, null).actionGet();

选举集群级元数据代码如下:


publicvoidperformStateRecovery(finalGatewayStateRecoveredListenerlistener) throwsGatewayException {

   //遍历请求的所有节点

   for (TransportNodesListGatewayMetaState.NodeGatewayMetaStatenodeState : nodesState.getNodes()) {

           if (nodeState.metaData() ==null) {

               continue;

           }

           found++;

   //根据元信息中记录的版本号选举元信息

           if (electedGlobalState==null) {

               electedGlobalState=nodeState.metaData();

           } elseif (nodeState.metaData().version() >electedGlobalState.version()) {

               electedGlobalState=nodeState.metaData();

           }

           for (ObjectCursor<IndexMetaData>cursor : nodeState.metaData().indices().values()) {

               indices.addTo(cursor.value.getIndex(), 1);

           }

       }

}

选举索引级元数据代码如下:


publicvoidperformStateRecovery(finalGatewayStateRecoveredListenerlistener) throwsGatewayException {

   finalObject[] keys=indices.keys;

   //遍历集群中的全部索引

       for (inti=0; i<keys.length; i++) {

           if (keys[i] !=null) {

               Indexindex= (Index) keys[i];

               IndexMetaDataelectedIndexMetaData=null;

               intindexMetaDataCount=0;

               //遍历请求的全部节点,对特定索引选择版本号最高的作为该索引的元数据

               for (TransportNodesListGatewayMetaState.NodeGatewayMetaStatenodeState : nodesState.getNodes()) {

                   if (nodeState.metaData() ==null) {

                       continue;

                   }

                   IndexMetaDataindexMetaData=nodeState.metaData().index(index);

                   if (indexMetaData==null) {

                       continue;

                   }

                   if (electedIndexMetaData==null) {

                       electedIndexMetaData=indexMetaData;

                   } elseif (indexMetaData.getVersion() >electedIndexMetaData.getVersion()) {

                       electedIndexMetaData=indexMetaData;

                   }

                   indexMetaDataCount++;

               }

           }

       }

}

5. 集群模块

集群模块封装了集群层面要执行的任务,把分片分配给节点属于集群层面的工作,在节点间迁移分片保持数据平衡,集群健康、集群级元信息管理,以及节点管理都属于集群层面的工作。

5.1 集群状态

集群状态在ES中封装为ClusterState类。可以通过_cluster/state API来获取集群状态


curl-X GET "localhost:9200/_cluster/state/"

响应信息中提供了集群名称、集群状态的总压缩大小(下发到数据节点时被压缩的)和集群状态本身,请求时可以通过设置过滤器来获取特定内容。

默认请求下,Coordinating node在收到这个请求后,会把请求路由到Master node上执行,确保获取最新集群状态。可以通过在请求中添加"local=true"参数,让接受请求的节点返回本地的集群状态。

5.2 内部封装和实现

MasterServiceClusterApplierService分布负责运行任务和应用任务产生的集群

5.2.1 MasterService

MasterService类负责集群任务管理、运行等工作。其对外提供提交任务接口,内部维护一个线程池运行这些任务,对外提供主要接口如下:

方法

简介

numberOfPendingTasks

返回待执行的任务数量

pendingTasks

返回待执行的任务列表

submitStateUpdateTasks

提交任务集群

主要数据成员如下:

成员

简介

clusterStatePublisher

发布集群任务的模块

clusterStateSupplier

存储集群状态

slowTaskLoggingThreshold

集群任务慢执行的检测

threadPoolExecutor

执行集群任务的线程池

taskBatcher

管理、执行提交的任务,通过submitStateUpdateTask方法提交调用内部类Batcher的提交方法

5.2.2 ClusterApplierService

ClusterApplierService类负责管理需要对集群任务进行处理的模块(Applier)和监听器(Listener),以及通知各个Applier应用集群状态,其对外提供接受集群状态的接口,当传输模块接受到集群状态时,调用这个接口将集群状态传递过来,内部维护一个线程池用于应用集群状态。对外提供主要接口如下:

方法

简介

addStateApplier

添加一个集群状态的处理器

addListener

添加一个集群状态的监听器

removeApplier

删除一个集群状态的处理器

removeListener

删除一个集群状态的监听器

state

返回集群状态

onNewClusterState

收到新的集群状态

submitStateUpdateTask

在新的线程池应用集群状态

主要数据成员如下表示:

成员

简介

clusterSettings

保存要通知的集群状态应用处理器

clusterStateListeners

保存集群状态监听器

state

保存最后的集群状态

threadPoolExecutor

应用集群任务集群的线程池

5.3 内部模块如何提交任务

内部模块通过clusterService.submitStateUpdateTask来提交一个任务集群。

· ClusterStateTaskListener:提交任务时实现一些回调函数,例如对任务处理失败、集群状态处理完毕时的处理。

· CLusterStateTaskExecutor:主要定义要执行的任务。每个任务在执行时会传入当前集群状态,任务执行完毕返回新产生的集群状态,如果没有产生新的集群状态,则返回原集群状态实例。

· ClusterStateTaskConfig:任务的配置信息,包括超时和优先级。


clusterService.submitStateUpdateTask("allocation indices ",newClusterStateUpdateTask{

   //实现要执行的具体任务,任务返回新的集群状态

   publicClusterStateexecute(ClusterStatecurrentState) {

       

   }

   //任务执行失败的回调

   publicvoidonFailure(Stringsource, Exceptione){

       

   }

   //集群状态处理完毕的回调,当集群状态已经被全部的ApplierListener处理完成时调用

   publicvoidclusterStateProcessed(Stringsource, ClusterStateoldState, ClusterStatenewState) {

   }

   

})

5.4 集群状态发布

发布集群状态是一个分布式事务操作,分布式事务需要实现原子性:要么所有参与者都提交事务,要么都取消事务。ES使用二段提交来实现分布式事务。二段提交可以避免失败回滚,其基本过程是:把信息发下去,但是不应用,如果得到多数节点确认,则再发一个请求出去要求节点应用。

ES实现二段提交与标准二段提交有些区别,发布集群状态到参与者的数量并非定义为全部,而是多数节点成功就算成功。多数的定义取决于配置项:


discovery.zen.minimum_master_nodes

两个阶段过程如下:

· 发布阶段:发布集群状态,等待响应

· 提交阶段:收到的响应数量大于minimum_master_nodes数量,发送commit请求。

主节点吧集群状态发下去,节点收到后不应用,当节点在discovery.zen.commit_timeout超时时间内,收到节点的确认数量达到discovery.zen.minimum_master_nodes-1(去掉本节点),主节点开始发送提交请求,如果节点discovery.zen.commit_timeout超时时间内没有收到主节点提交请求,则拒绝该集群状态。当节点收到节点的提交请求后,开始应用集群状态。主节点等待响应,直到收到全部响应,整个流程发布介绍。

6. 总结

由于篇幅有限,本篇文件简单给大家介绍elasticsearch集群基本概念、elasticsearch分布式架构中一些模块的设计以及核心源码的分析,为自己构建一个简单的分布式搜索系统提供一些参考方向,不再需要将过多的精力放在日志中心本身的高可用性和查询优化上,可以无感知的增加或减少集群的节点,从而做到更高效的进行日志的业务开发工作。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
3月前
|
自然语言处理 搜索推荐 数据库
高性能分布式搜索引擎Elasticsearch详解
高性能分布式搜索引擎Elasticsearch详解
95 4
高性能分布式搜索引擎Elasticsearch详解
|
2月前
|
自然语言处理 搜索推荐 关系型数据库
elasticsearch学习六:学习 全文搜索引擎 elasticsearch的语法,使用kibana进行模拟测试(持续更新学习)
这篇文章是关于Elasticsearch全文搜索引擎的学习指南,涵盖了基本概念、命令风格、索引操作、分词器使用,以及数据的增加、修改、删除和查询等操作。
35 0
elasticsearch学习六:学习 全文搜索引擎 elasticsearch的语法,使用kibana进行模拟测试(持续更新学习)
|
2月前
|
开发框架 监控 搜索推荐
GoFly快速开发框架集成ZincSearch全文搜索引擎 - Elasticsearch轻量级替代为ZincSearch全文搜索引擎
本文介绍了在项目开发中使用ZincSearch作为全文搜索引擎的优势,包括其轻量级、易于安装和使用、资源占用低等特点,以及如何在GoFly快速开发框架中集成和使用ZincSearch,提供了详细的开发文档和实例代码,帮助开发者高效地实现搜索功能。
192 0
|
2月前
|
自然语言处理 搜索推荐 Java
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(一)
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图
58 0
|
2月前
|
存储 自然语言处理 搜索推荐
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(二)
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(二)
40 0
|
7月前
|
存储 自然语言处理 搜索推荐
分布式搜索引擎ElasticSearch
Elasticsearch是一款强大的开源搜索引擎,用于快速搜索和数据分析。它在GitHub、电商搜索、百度搜索等场景中广泛应用。Elasticsearch是ELK(Elasticsearch、Logstash、Kibana)技术栈的核心,用于存储、搜索和分析数据。它基于Apache Lucene构建,提供分布式搜索能力。相比其他搜索引擎,如Solr,Elasticsearch更受欢迎。倒排索引是其高效搜索的关键,通过将词条与文档ID关联,实现快速模糊搜索,避免全表扫描。
283 12
|
6月前
|
存储 搜索推荐 关系型数据库
【搜索引擎】elastic search核心概念
【搜索引擎】elastic search核心概念
54 0
|
7月前
|
监控 搜索推荐 安全
面经:Elasticsearch全文搜索引擎原理与实战
【4月更文挑战第10天】本文是关于Elasticsearch面试准备的博客,重点讨论了四个核心主题:Elasticsearch的分布式架构和数据模型、CRUD操作与查询DSL、集群管理与性能优化,以及安全与插件扩展。文中通过代码示例介绍了如何进行文档操作、查询以及集群管理,并强调理解Elasticsearch的底层原理和优化策略对面试和实际工作的重要性。
73 6
|
7月前
|
监控 数据可视化 搜索推荐
初识Elasticsearch:打造高效全文搜索与数据分析引擎
【4月更文挑战第7天】Elasticsearch,一款由Elastic公司开发的分布式搜索引擎,以其全文搜索和数据分析能力在全球范围内广泛应用。它基于Apache Lucene,支持JSON,适用于日志分析、监控等领域。Elasticsearch的亮点包括:精准快速的全文搜索,通过倒排索引和分析器实现;强大的数据分析与实时响应能力,提供丰富聚合功能;弹性扩展和高可用性,适应水平扩展和故障恢复;以及完善的生态系统,与Kibana、Logstash等工具集成,支持多种编程语言。作为大数据处理的重要工具,Elasticsearch在企业级搜索和数据分析中扮演关键角色。
208 1
|
7月前
|
存储 搜索推荐 Java
Java远程连接本地开源分布式搜索引擎ElasticSearch
Java远程连接本地开源分布式搜索引擎ElasticSearch
下一篇
DataWorks