关于SolrCore引发的总结---分布式搜索实现

简介: 假期重新把之前在新浪博客里面的文字梳理了下,搬到这里。本文是SolrCore原理分析的连载之一,介绍分布式搜索实现原理。

-Solr3.* 分布式搜索实现分析

本部分承接关于SolrCore引发的总结 详细理解内容见中文注释。

1 总体描述

solr分布式搜索框架基础上(所有请求在管道流程中,进入handlerRequestBody逻辑,handleRequestBody

这里面,执行遍历每个SearchComponentprepareprocessafter三阶段逻辑,并且searchcomponent是按配置顺序来遍历执行),实现上有关键概念:线程、分阶段、链接池三大核心技术来完成分布式搜索。

 

功能的扩展,SearchComponent的 横向扩展(分布式)、纵向扩展(配置多个SearchComponent,会执行循环处理,每个阶段,只是符合特定条件的逻辑才触发)。详细流程可以参考上一部分的关于SolrCore引发的总结的updateRequest 部分的分解。

 

框架的扩展,在handler这一层,handler总领handleRequestBody,进而总领这个handler配置的SearchComponent,而具体的SearchComponent会有本地和分布式两块逻辑。如果是分布式的,在分布式HttpCommComponent 中来协调分阶段循环、线程、连接池的管理


2 详细分析部分

refrence code version solr3.4

org.apache.solr.handler.component 

SearchHandler extends RequestHandlerBase implements SolrCoreAware

@Override

public voidhandleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throwsException, ParseException, InstantiationException, IllegalAccessException

{

// int sleep = req.getParams().getInt("sleep",0);

// if (sleep > 0) {log.error("SLEEPING for " + sleep);  Thread.sleep(sleep);}

   ResponseBuilder rb = new ResponseBuilder();

   rb.req= req;

   rb.rsp= rsp;

   rb.components= components;

   rb.setDebug(req.getParams().getBool(CommonParams.DEBUG_QUERY, false));

   finalRTimer timer = rb.isDebug() ? new RTimer() : null;

   if(timer == null) {

     // non-debugging prepare phase

     for( SearchComponent c : components) {

       c.prepare(rb);

     }

   } else{

     // debugging prepare phase

     RTimer subt = timer.sub( "prepare" );

     for( SearchComponent c : components) {

       rb.setTimer( subt.sub( c.getName() ) );

       c.prepare(rb);

       rb.getTimer().stop();

     }

     subt.stop();

   }

 

   if(rb.shards== null) {

// a normal non-distributed request

// The semantics of debugging vs not debugging are different enough that

// it makes sense to have two control loops

     if(!rb.isDebug()) {

       // Process

       for( SearchComponent c : components ) {

         c.process(rb);

       }

     }

     else{

       // Process

       RTimer subt = timer.sub( "process");

       for( SearchComponent c : components ) {

         rb.setTimer( subt.sub( c.getName() ) );

         c.process(rb);

         rb.getTimer().stop();

       }

       subt.stop();

       timer.stop();

 

       // add the timing info

       if( rb.getDebugInfo() == null ) {

         rb.setDebugInfo( newSimpleOrderedMap

       }

       rb.getDebugInfo().add( "timing", timer.asNamedList() );

     }

 

   } else{

     // a distributed request

//分布式处理部分。comm里面引用clientcompomentprepare 两部分是相同的。在process过程出现分布or 非分布式分支

     HttpCommComponent comm = new HttpCommComponent(); //总管对象,接管请求之前、之后的逻辑,负责网络的任务提交、返回的封装。

     if(rb.outgoing== null) {

       rb.outgoing = new LinkedList();

     }

     rb.finished= newArrayList();

     intnextStage = 0;

     do{

       rb.stage = nextStage;

       nextStage = ResponseBuilder.STAGE_DONE;

//call all components 尽管调用了所有component,但是并不是所有component都有分布式支持

//3.4只有QueryComponentFacetComponent实现了distributedProcess

//其他的searchcomponent 直接返回state.done

//searchcomponentdistributedProcess中生成outgoing list 的元素

//QueryComponet 第一轮循环,STAGE_EXECUTE_QUERY,生成query请求,第二轮循环STAGE_GET_FIELDS 生成query请求。FacetComponet 只有一个阶段 getfields

//这里循环才是change state的入口,由最外层do while 触发。也是理解分阶段并发的关键。

       for( SearchComponent c : components ) {

         // the next stage is the minimum of what all components report

         nextStage = Math.min(nextStage, c.distributedProcess(rb));

       }

       // check the outgoing queue and send requests

       while (rb.outgoing.size() > 0) {

//向所有shard对象执行query请求,也就是第一轮时候,所有shard 都执行相同的 请求,处理query execute,请求状态统一在发起结点控制,等所有shards第一阶段完成后,开始第二阶段的请求,getfields,并执行第二阶段的等待和finished。每一个阶段结束,会执行相应的comm.canceAll ,但是comm没有close。实际上

第一阶段

SearchComponentdistributedProcess,也即

querycomponent 生成request,而facetcomponentdistributed 没有动作

第一阶段提交之后,等待返回,执行handlerResponse任务,会根据srepspurpose决定本阶段的结束的工作,其实,只有getTopIDs 才会执行megeIds,第一阶段createMainqueyr中会设置sreq purpose

第一阶段提交并且全部返回后,这里是来一个response就执行一个。最后,执行finished操作,其中只有在getfiled state也就是第二阶段的getfield的时候才有效。参见QuerycomponentFacetComponentfinished部分

第二阶段,对应do while循环

此时 searchercomponent distrubutedProcess执行了状态的调整和提交本阶段的请求,此时outgingsize 不空了

全部shard都接受到请求,对请求做本阶段提交的waitingcomcancleALLhandlerResponse

接着退出等待,处理finished任务

         // submit all current request tasks at once

         while (rb.outgoing.size() > 0) {

           ShardRequest sreq = rb.outgoing.remove(0);

           sreq.actualShards = sreq.shards;

           if (sreq.actualShards==ShardRequest.ALL_SHARDS) {

             sreq.actualShards = rb.shards;

           }

           sreq.responses = new ArrayList();

//所有shard 都执行提交,但是状体的协调其实都在当前这个分发结点,也就执行merge的结点

           // TODO: map from shard to address[]

           for (String shard : sreq.actualShards) {

             ModifiableSolrParams params = newModifiableSolrParams(sreq.params);

             params.remove(ShardParams.SHARDS);      // not a top-level request

             params.remove("indent");

             params.remove(CommonParams.HEADER_ECHO_PARAMS);

             params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request

             String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);

             if (shardHandler == null) {

               params.remove(CommonParams.QT);

             } else {

               params.set(CommonParams.QT, shardHandler);

             }

             comm.submit(sreq, shard, params);

           }

         }

//提交第一阶段完毕后,outgoing.size=0,开始等待第一阶段的全部resp,并执行componenthandlerResponse,进入第二阶段的query execut的时候,才会有mergerid()处理或者 第三阶段的getfields 阶段createRetrieveDocs 请求之后才有returnFields()

这里是while 循环体内  ShardResponse srsp = comm.takeCompletedOrError();

一直在等待shard的当前阶段的返回。也就是说mergerIdgetFields都是动态执行的。

         // now wait for replies, but if anyone puts more requests on

         // the outgoing queue, send them out immediately (by exiting

         // this loop)

         while (rb.outgoing.size() == 0) {

           ShardResponse srsp = comm.takeCompletedOrError();//等待过程 等待全部提交的返回,可能有慢待?

           if (srsp == null) break;  // no more requests to wait for

           // Was there an exception?  If so, abort everything and

           // rethrow

           if (srsp.getException() != null) {

             comm.cancelAll();//每个阶段的所有pending cancel

             if (srsp.getException() instanceof SolrException) {

               throw (SolrException)srsp.getException();

             } else {

               throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());

             }

           }

           rb.finished.add(srsp.getShardRequest());

           // let the components see the responses to the request

           for(SearchComponent c : components) {

             c.handleResponses(rb, srsp.getShardRequest()); //遍历了component了,但是不是合适的purpose,等于没有操作。而purpose在阶段任务提交时做了设置

           }

         }

       }

//其中在getfield阶段时,才有效果。对之前阶段的预分配的doc执行过滤,egtop =10 merger默认10*shard,由于可能重复,此时mergerid 结果小于10*shard

       for(SearchComponent c : components) {

           c.finishStage(rb);

        }

       // we are done when the next stage is MAX_VALUE

     } while(nextStage != Integer.MAX_VALUE);

   }

 }

/////////////////////////////////////////

Solr 搜索框架下的分布式搜索

// TODO: generalize how a comm component can fit into search component framework

// TODO: statics should be per-core singletons

class HttpCommComponent {

  // We want an executor that doesn't take up any resources if

  // it's not used, so it could be created statically for

  // the distributed search component if desired.

  //

  // Consider CallerRuns policy and a lower max threads to throttle

  // requests at some point (or should we simply return failure?)

  staticExecutor commExecutor= newThreadPoolExecutor(

         0,

         Integer.MAX_VALUE,

         5, TimeUnit.SECONDS, // terminate idle threads after 5 sec

         new SynchronousQueue()  // directly hand off tasks

 ); 

  staticHttpClient client; //static 表明一个solr instance 共享一个client,也往往一个jvm对应一个client

  static{

   MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();

   mgr.getParams().setDefaultMaxConnectionsPerHost(20);//链接数有点少啊

//可能导致的一个问题:请求超时,导致connect 链接失败http://blog.csdn.net/duck_genuine/article/details/7916553

   mgr.getParams().setMaxTotalConnections(10000);

   mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout);

   mgr.getParams().setSoTimeout(SearchHandler.soTimeout);

   // mgr.getParams().setStaleCheckingEnabled(false);

   client= newHttpClient(mgr);  

 }

 CompletionService completionService = newExecutorCompletionService(commExecutor);

 Set> pending = newHashSet>();

 HttpCommComponent() {

 }

  privatestaticclassSimpleSolrResponse extends SolrResponse {

   longelapsedTime;

   NamedList nl;

   @Override

   publiclonggetElapsedTime() {

     returnelapsedTime;

   }

   @Override

   publicNamedList

     returnnl;

   }

   @Override

   publicvoidsetResponse(NamedList

     nl = rsp;

   }

 }

  voidsubmit(finalShardRequest sreq, final String shard, final ModifiableSolrParams params) {

   Callable task = new Callable() {

     publicShardResponse call() throws Exception {

       ShardResponse srsp = new ShardResponse();

       srsp.setShardRequest(sreq);

       srsp.setShard(shard);

       SimpleSolrResponse ssr = new SimpleSolrResponse();

       srsp.setSolrResponse(ssr);

       long startTime = System.currentTimeMillis();

       try {

         // String url = "http://" + shard + "/select";

         String url = "http://" + shard;

         params.remove(CommonParams.WT); // use default (currently javabin)

         params.remove(CommonParams.VERSION);

         SolrServer server = newCommonsHttpSolrServer(url, client);

         // SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");

         // use generic request to avoid extra processing of queries

         QueryRequest req = new QueryRequest(params);

         req.setMethod(SolrRequest.METHOD.POST);

         // no need to set the response parser as binary is the default

         // req.setResponseParser(new BinaryResponseParser());

         // srsp.rsp = server.request(req);

         // srsp.rsp = server.query(sreq.params) 

         ssr.nl = server.request(req);

       } catch (Throwable th) {

         srsp.setException(th);

         if (th instanceof SolrException) {

           srsp.setResponseCode(((SolrException)th).code());

         } else {

           srsp.setResponseCode(-1);

         }

       }

       ssr.elapsedTime = System.currentTimeMillis() - startTime;

       return srsp;

     }

   };

   pending.add( completionService.submit(task) );

 }

 ShardResponse take() {

   while(pending.size() > 0) {

     try{

       Future future = completionService.take();

       pending.remove(future);

       ShardResponse rsp = future.get();

       rsp.getShardRequest().responses.add(rsp);

       if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {

         return rsp;

       }

     } catch(InterruptedException e) {

       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);

     } catch(ExecutionException e) {

       // should be impossible... the problem with catching the exception

       // at this level is we don't know what ShardRequest it applied to

       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);

     }

   }

   returnnull;

 } 

 ShardResponse takeCompletedOrError() {

   while(pending.size() > 0) {

     try{

       Future future = completionService.take();

       pending.remove(future);

       ShardResponse rsp = future.get();

       if (rsp.getException() != null) return rsp; // if exception, return immediately

       // add response to the response list... we do this after the take() and

       // not after the completion of "call" so we know when the last response

       // for a request was received.  Otherwise we might return the same

       // request more than once.

       rsp.getShardRequest().responses.add(rsp);

       if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {

         return rsp;

       }

     } catch(InterruptedException e) {

       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);

     } catch(ExecutionException e) {

       // should be impossible... the problem with catching the exception

       // at this level is we don't know what ShardRequest it applied to

       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);

     }

   }

   returnnull;

 }

  voidcancelAll() {

   for(Future future : pending) {

     // TODO: any issues with interrupting?  shouldn't be if

     // there are finally blocks to release connections.

     future.cancel(true);

   }

 }

 

}

目录
相关文章
|
7月前
|
分布式计算 Ubuntu Hadoop
百度搜索:蓝易云【Ubuntu搭建全分布式Hadoop】
请注意,以上只是概述,并不包含详细的步骤和指令。搭建全分布式Hadoop是一个复杂的过程,需要对Hadoop的架构和配置有深入的理解,并熟悉Linux系统管理。建议在搭建全分布式Hadoop之前,先学习相关知识并查阅官方文档和教程,以确保正确搭建和配置Hadoop集群。
64 0
|
4月前
|
SQL JSON 大数据
ElasticSearch的简单介绍与使用【进阶检索】 实时搜索 | 分布式搜索 | 全文搜索 | 大数据处理 | 搜索过滤 | 搜索排序
这篇文章是Elasticsearch的进阶使用指南,涵盖了Search API的两种检索方式、Query DSL的基本语法和多种查询示例,包括全文检索、短语匹配、多字段匹配、复合查询、结果过滤、聚合操作以及Mapping的概念和操作,还讨论了Elasticsearch 7.x和8.x版本中type概念的变更和数据迁移的方法。
ElasticSearch的简单介绍与使用【进阶检索】 实时搜索 | 分布式搜索 | 全文搜索 | 大数据处理 | 搜索过滤 | 搜索排序
因为一个问题、我新学了一门技术 ElasticSearch 分布式搜索
这篇文章讲述了作者因为一个检索问题而学习了ElasticSearch技术,并分享了排查和解决ElasticSearch检索结果与页面展示不符的过程。
因为一个问题、我新学了一门技术 ElasticSearch 分布式搜索
|
5月前
|
缓存 Devops 微服务
微服务01好处,随着代码越多耦合度越多,升级维护困难,微服务技术栈,异步通信技术,缓存技术,DevOps技术,搜索技术,单体架构,分布式架构将业务功能进行拆分,部署时费劲,集连失败如何解决
微服务01好处,随着代码越多耦合度越多,升级维护困难,微服务技术栈,异步通信技术,缓存技术,DevOps技术,搜索技术,单体架构,分布式架构将业务功能进行拆分,部署时费劲,集连失败如何解决
|
5月前
|
运维 监控 Java
在大数据场景下,Elasticsearch作为分布式搜索与分析引擎,因其扩展性和易用性成为全文检索首选。
【7月更文挑战第1天】在大数据场景下,Elasticsearch作为分布式搜索与分析引擎,因其扩展性和易用性成为全文检索首选。本文讲解如何在Java中集成Elasticsearch,包括安装配置、使用RestHighLevelClient连接、创建索引和文档操作,以及全文检索查询。此外,还涉及高级查询、性能优化和故障排查,帮助开发者高效处理非结构化数据。
79 0
|
7月前
|
分布式计算 Hadoop Java
百度搜索:蓝易云【HBase分布式安装配置教程。】
以上是一个简要的HBase分布式安装和配置教程。需要注意的是,HBase的配置和部署涉及更多的细节和参数设置,取决于你的特定环境和需求。建议你参考HBase官方文档或其他可靠资源,以获得更详细和全面的指导。
80 6
|
Web App开发 Docker 容器
百度搜索:蓝易云【用docker搭建selenium grid分布式环境实践】
通过这些步骤,您可以使用Docker搭建Selenium Grid分布式环境,并在多个节点上并行运行Selenium测试。根据实际需求,您还可以进行更高级的配置和扩展,如增加更多的节点、配置浏览器版本等。
72 1
|
存储 索引
ES 分布式搜索的运行机制
ES 分布式搜索的运行机制
58 1
ES 分布式搜索的运行机制
|
前端开发
47分布式电商项目 - 商品关键字搜索
47分布式电商项目 - 商品关键字搜索
46 0
47分布式电商项目 - 商品关键字搜索
|
消息中间件 搜索推荐 索引
57分布式电商项目 - ActiveMQ 实现运营商后台与搜索服务的零耦合(二)
57分布式电商项目 - ActiveMQ 实现运营商后台与搜索服务的零耦合(二)
154 0