-Solr3.* 分布式搜索实现分析
本部分承接关于SolrCore引发的总结 详细理解内容见中文注释。
1 总体描述
在solr分布式搜索框架基础上(所有请求在管道流程中,进入handlerRequestBody逻辑,handleRequestBody
这里面,执行遍历每个SearchComponent的prepare、process、after三阶段逻辑,并且searchcomponent是按配置顺序来遍历执行),实现上有关键概念:线程、分阶段、链接池三大核心技术来完成分布式搜索。
功能的扩展,SearchComponent的 横向扩展(分布式)、纵向扩展(配置多个SearchComponent,会执行循环处理,每个阶段,只是符合特定条件的逻辑才触发)。详细流程可以参考上一部分的关于SolrCore引发的总结的updateRequest 部分的分解。
框架的扩展,在handler这一层,handler总领handleRequestBody,进而总领这个handler配置的SearchComponent,而具体的SearchComponent会有本地和分布式两块逻辑。如果是分布式的,在分布式HttpCommComponent 中来协调分阶段循环、线程、连接池的管理
2 详细分析部分
refrence code version solr3.4
org.apache.solr.handler.component
SearchHandler extends RequestHandlerBase implements SolrCoreAware
@Override
public voidhandleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throwsException, ParseException, InstantiationException, IllegalAccessException
{
// int sleep = req.getParams().getInt("sleep",0);
// if (sleep > 0) {log.error("SLEEPING for " + sleep); Thread.sleep(sleep);}
ResponseBuilder rb = new ResponseBuilder();
rb.req= req;
rb.rsp= rsp;
rb.components= components;
rb.setDebug(req.getParams().getBool(CommonParams.DEBUG_QUERY, false));
finalRTimer timer = rb.isDebug() ? new RTimer() : null;
if(timer == null) {
// non-debugging prepare phase
for( SearchComponent c : components) {
c.prepare(rb);
}
} else{
// debugging prepare phase
RTimer subt = timer.sub( "prepare" );
for( SearchComponent c : components) {
rb.setTimer( subt.sub( c.getName() ) );
c.prepare(rb);
rb.getTimer().stop();
}
subt.stop();
}
if(rb.shards== null) {
// a normal non-distributed request
// The semantics of debugging vs not debugging are different enough that
// it makes sense to have two control loops
if(!rb.isDebug()) {
// Process
for( SearchComponent c : components ) {
c.process(rb);
}
}
else{
// Process
RTimer subt = timer.sub( "process");
for( SearchComponent c : components ) {
rb.setTimer( subt.sub( c.getName() ) );
c.process(rb);
rb.getTimer().stop();
}
subt.stop();
timer.stop();
// add the timing info
if( rb.getDebugInfo() == null ) {
rb.setDebugInfo( newSimpleOrderedMap
}
rb.getDebugInfo().add( "timing", timer.asNamedList() );
}
} else{
// a distributed request
//分布式处理部分。comm里面引用client,compoment的prepare 两部分是相同的。在process过程出现分布or 非分布式分支
HttpCommComponent comm = new HttpCommComponent(); //总管对象,接管请求之前、之后的逻辑,负责网络的任务提交、返回的封装。
if(rb.outgoing== null) {
rb.outgoing = new LinkedList();
}
rb.finished= newArrayList();
intnextStage = 0;
do{
rb.stage = nextStage;
nextStage = ResponseBuilder.STAGE_DONE;
//call all components 尽管调用了所有component,但是并不是所有component都有分布式支持
//3.4只有QueryComponent、FacetComponent实现了distributedProcess
//其他的searchcomponent 直接返回state.done
//在searchcomponent的distributedProcess中生成outgoing list 的元素。
//QueryComponet 第一轮循环,STAGE_EXECUTE_QUERY,生成query请求,第二轮循环STAGE_GET_FIELDS 生成query请求。FacetComponet 只有一个阶段 getfields
//这里循环才是change state的入口,由最外层do while 触发。也是理解分阶段并发的关键。
for( SearchComponent c : components ) {
// the next stage is the minimum of what all components report
nextStage = Math.min(nextStage, c.distributedProcess(rb));
}
// check the outgoing queue and send requests
while (rb.outgoing.size() > 0) {
//向所有shard对象执行query请求,也就是第一轮时候,所有shard 都执行相同的 请求,处理query execute,请求状态统一在发起结点控制,等所有shards第一阶段完成后,开始第二阶段的请求,getfields,并执行第二阶段的等待和finished。每一个阶段结束,会执行相应的comm.canceAll ,但是comm没有close。实际上
第一阶段
SearchComponent的distributedProcess,也即
querycomponent 生成request,而facetcomponent的distributed 没有动作
第一阶段提交之后,等待返回,执行handlerResponse任务,会根据sreps的purpose决定本阶段的结束的工作,其实,只有getTopIDs 才会执行megeIds,第一阶段createMainqueyr中会设置sreq 的purpose。
第一阶段提交并且全部返回后,这里是来一个response就执行一个。最后,执行finished操作,其中只有在getfiled state也就是第二阶段的getfield的时候才有效。参见Querycomponent、FacetComponent的finished部分
第二阶段,对应do while循环
此时 searchercomponent 的distrubutedProcess执行了状态的调整和提交本阶段的请求,此时outging的size 不空了
全部shard都接受到请求,对请求做本阶段提交的waiting、comcancleALL、handlerResponse,
接着退出等待,处理finished任务
// submit all current request tasks at once
while (rb.outgoing.size() > 0) {
ShardRequest sreq = rb.outgoing.remove(0);
sreq.actualShards = sreq.shards;
if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
sreq.actualShards = rb.shards;
}
sreq.responses = new ArrayList();
//所有shard 都执行提交,但是状体的协调其实都在当前这个分发结点,也就执行merge的结点
// TODO: map from shard to address[]
for (String shard : sreq.actualShards) {
ModifiableSolrParams params = newModifiableSolrParams(sreq.params);
params.remove(ShardParams.SHARDS); // not a top-level request
params.remove("indent");
params.remove(CommonParams.HEADER_ECHO_PARAMS);
params.set(ShardParams.IS_SHARD, true); // a sub (shard) request
String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);
if (shardHandler == null) {
params.remove(CommonParams.QT);
} else {
params.set(CommonParams.QT, shardHandler);
}
comm.submit(sreq, shard, params);
}
}
//提交第一阶段完毕后,outgoing.size=0,开始等待第一阶段的全部resp,并执行component的handlerResponse,进入第二阶段的query execut的时候,才会有mergerid()处理或者 第三阶段的getfields 阶段createRetrieveDocs 请求之后才有returnFields()
这里是while 循环体内 ShardResponse srsp = comm.takeCompletedOrError();
一直在等待shard的当前阶段的返回。也就是说mergerId、getFields都是动态执行的。
// now wait for replies, but if anyone puts more requests on
// the outgoing queue, send them out immediately (by exiting
// this loop)
while (rb.outgoing.size() == 0) {
ShardResponse srsp = comm.takeCompletedOrError();//等待过程 等待全部提交的返回,可能有慢待?
if (srsp == null) break; // no more requests to wait for
// Was there an exception? If so, abort everything and
// rethrow
if (srsp.getException() != null) {
comm.cancelAll();//每个阶段的所有pending cancel
if (srsp.getException() instanceof SolrException) {
throw (SolrException)srsp.getException();
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());
}
}
rb.finished.add(srsp.getShardRequest());
// let the components see the responses to the request
for(SearchComponent c : components) {
c.handleResponses(rb, srsp.getShardRequest()); //遍历了component了,但是不是合适的purpose,等于没有操作。而purpose在阶段任务提交时做了设置
}
}
}
//其中在getfield阶段时,才有效果。对之前阶段的预分配的doc执行过滤,egtop =10, merger默认10*shard,由于可能重复,此时mergerid 结果小于10*shard
for(SearchComponent c : components) {
c.finishStage(rb);
}
// we are done when the next stage is MAX_VALUE
} while(nextStage != Integer.MAX_VALUE);
}
}
/////////////////////////////////////////
Solr 搜索框架下的分布式搜索
// TODO: generalize how a comm component can fit into search component framework
// TODO: statics should be per-core singletons
class HttpCommComponent {
// We want an executor that doesn't take up any resources if
// it's not used, so it could be created statically for
// the distributed search component if desired.
//
// Consider CallerRuns policy and a lower max threads to throttle
// requests at some point (or should we simply return failure?)
staticExecutor commExecutor= newThreadPoolExecutor(
0,
Integer.MAX_VALUE,
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
new SynchronousQueue() // directly hand off tasks
);
staticHttpClient client; //static 表明一个solr instance 共享一个client,也往往一个jvm对应一个client
static{
MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
mgr.getParams().setDefaultMaxConnectionsPerHost(20);//链接数有点少啊
//可能导致的一个问题:请求超时,导致connect 链接失败http://blog.csdn.net/duck_genuine/article/details/7916553
mgr.getParams().setMaxTotalConnections(10000);
mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout);
mgr.getParams().setSoTimeout(SearchHandler.soTimeout);
// mgr.getParams().setStaleCheckingEnabled(false);
client= newHttpClient(mgr);
}
CompletionService completionService = newExecutorCompletionService(commExecutor);
Set> pending = newHashSet>();
HttpCommComponent() {
}
privatestaticclassSimpleSolrResponse extends SolrResponse {
longelapsedTime;
NamedList nl;
@Override
publiclonggetElapsedTime() {
returnelapsedTime;
}
@Override
publicNamedList
returnnl;
}
@Override
publicvoidsetResponse(NamedList
nl = rsp;
}
}
voidsubmit(finalShardRequest sreq, final String shard, final ModifiableSolrParams params) {
Callable task = new Callable() {
publicShardResponse call() throws Exception {
ShardResponse srsp = new ShardResponse();
srsp.setShardRequest(sreq);
srsp.setShard(shard);
SimpleSolrResponse ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
long startTime = System.currentTimeMillis();
try {
// String url = "http://" + shard + "/select";
String url = "http://" + shard;
params.remove(CommonParams.WT); // use default (currently javabin)
params.remove(CommonParams.VERSION);
SolrServer server = newCommonsHttpSolrServer(url, client);
// SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
// use generic request to avoid extra processing of queries
QueryRequest req = new QueryRequest(params);
req.setMethod(SolrRequest.METHOD.POST);
// no need to set the response parser as binary is the default
// req.setResponseParser(new BinaryResponseParser());
// srsp.rsp = server.request(req);
// srsp.rsp = server.query(sreq.params)
ssr.nl = server.request(req);
} catch (Throwable th) {
srsp.setException(th);
if (th instanceof SolrException) {
srsp.setResponseCode(((SolrException)th).code());
} else {
srsp.setResponseCode(-1);
}
}
ssr.elapsedTime = System.currentTimeMillis() - startTime;
return srsp;
}
};
pending.add( completionService.submit(task) );
}
ShardResponse take() {
while(pending.size() > 0) {
try{
Future future = completionService.take();
pending.remove(future);
ShardResponse rsp = future.get();
rsp.getShardRequest().responses.add(rsp);
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
return rsp;
}
} catch(InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch(ExecutionException e) {
// should be impossible... the problem with catching the exception
// at this level is we don't know what ShardRequest it applied to
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
}
}
returnnull;
}
ShardResponse takeCompletedOrError() {
while(pending.size() > 0) {
try{
Future future = completionService.take();
pending.remove(future);
ShardResponse rsp = future.get();
if (rsp.getException() != null) return rsp; // if exception, return immediately
// add response to the response list... we do this after the take() and
// not after the completion of "call" so we know when the last response
// for a request was received. Otherwise we might return the same
// request more than once.
rsp.getShardRequest().responses.add(rsp);
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
return rsp;
}
} catch(InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch(ExecutionException e) {
// should be impossible... the problem with catching the exception
// at this level is we don't know what ShardRequest it applied to
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
}
}
returnnull;
}
voidcancelAll() {
for(Future future : pending) {
// TODO: any issues with interrupting? shouldn't be if
// there are finally blocks to release connections.
future.cancel(true);
}
}
}