关于SolrCore引发的总结--持续更新 这篇博客作为一个总体描述的开始
关于SolrCore引发的总结---分布式搜索实现 围绕分布式部分做说明
下面针对本地 searchComponent做一个总结。针对lucene3.4源码。
概述内容
SearchComponent 的角色:
在urlpath 映射下searcherhandler 下面,配置并使用。也就是说SearcherHandler管理一序列SearchComponent。配置参考solrConfig.xml。eg mabi 不能贴代码,真恶心。
Default configuration in a requestHandler would look like:
<searchCompoent name="query" class="solr.QueryCompoent">
<facet mlt 类似
highlight类似
stats类似
debug类似
<searchComponent name="mySearchComponent" class="my.mycompoent">
再看SearchComponent 二个重要的抽象方法prepare、process。另外distributedProcess、finishStage为处理分布式请求服务。
注意:在分布式处理请求中,定义的components,其中并不是每个searchComponent具有分布式服务支持。QueryComponent、FacetComponent是主要关联分布式的。也潜在说明:components配置本身是多个的、挨个执行,并且components又具有分布式、横向关系。也就是说查询的本地、远程这种
“+----+”纵横关联结构满足了本地扩展、分布式扩展需求。这种设计应该是非常具有借鉴作用。
以QueryComponent为重点分析,其他Component的功能相对简单,并且没有设计读索引、排序等逻辑,就在此省去。另外,QueryComponent关联了QParse\Query\Weight\Score\Collector\DocSet\DocList查询底层数据结构,从queryComponent入口,可以帮助理顺SolrIndexSearcher是如何在Query类型控制下,读取索引,执行排序,并学习Solr在DocSet、DocList、Score、Collector的高效编码实现。
QueryComponent.prepare分析
public void prepare(ResponseBuilder rb) throws IOException
{
传参是ResponseBuilder。这个ResponseBuilder贯穿整个query的开始、结束。类似的设计,例如SolrCore 总管索引入口一样,由一个“全局熟悉”的对象来服务整个过程,使得参数的获取统一、结果的保存统一。这个是java的对象传引用相关的。eg 定义了一个List,然后作为参数传给某个方法,方法里面产生的结果写入参数list,这样list就带回了值。
处理fl参数。fl表示查询命中的文档需要返回的域信息。默认不写的话,就返回全部stored=true的字段。如果是mulitValue=true,字段是list类型,如果域值是null,返回时这个域可能直接没有显示。
处理queryType。queryType直接对query做非常非常重要的解析。毫无夸张的说,query解析直接反应了引擎的“智力”。解析的类型也绑定了weight的生成实现、score的生成,而similarity是通过SolrIndexSearcher从schema中带进来的。similarity是query层共享,和queryresultcache、filtercache等一样,被多个component处理过程共享。 处理QParser。需要的内容:被qparser处理生成的query、getsort参数,包括rows、start,这在collector过程heap生成有关。比较危险:当start非常大的时候,heap空间占用巨大,导致oom。这就是大量请求下solr最先出现问题的一搬都是OOM,尽管OOM了,但发现还是可以查询的。这个状态已经有风险了在数据一致性上。需要做监控对OOM的关键词。solr4 在深度翻页也就是start很大的时候,是保存上一层得分结果的,这样deep pagnation效果不错。
处理fq。fq一定是和q配合使用,光有fq,没有q的查询已经失去fq的意义了。测试同学有时候不理解,坚持这是bug。哈哈。。。要好好沟通下啊。fq是无排序结果的cache、并且key是固定的。也就是说走fq的场景是查询串中,那些不变、经常被查询的条件。那么可以将查询条件切的更细,每个细粒度做一个fq,依赖cache。但是fq也是比较耗内存的。因为保存了整个结果集。
处理shard。这部分是merge的参数处理。 所有处理过程的结果都写入ReponseBuilder中,带入process中。
QueryComponent.process分析
prepare部分看似简单不过了。但是提醒:qparser的定义和实现只是一笔带过,真正意思上qparser直接反应了引擎的水平。特别是对用户输入的理解程度。关于qparser的后续讲解,进入process阶段。
整个process可以抽取为三部分:ids、group、常规。ids是针对唯一id的快速查找。 group是query有了group相关参数后,就由group来执行查询。然后是常规的处理。ids是直接term 查询,group是group.execute来完成,search是searcher.search()来完成。下面只分析group、search()2部分。
timeAllowed 参数:如果超时就返回部分结果,并且这个时间对象是后台线程。最终作用在TimeLimitingCollector上。 这个设计也是非常值得学习的。下面贴出这个时间控制的4端代码。时间对象初始化、时间对象定义、超时作用点、超时退出. qTime就是时间信息的返回。
--------------------------------------------
private final static TimerThread TIMER_THREAD = new TimerThread(); //单例、后台线程 static {
TIMER_THREAD.start();
}
--------------------------------------------
private static final class TimerThread extends Thread {
// NOTE: we can avoid explicit synchronization here for several reasons:
// * updates to volatile long variables are atomic
// * only single thread modifies this value
// * use of volatile keyword ensures that it does not reside in
// a register, but in main memory (so that changes are visible to
// other threads).
// * visibility of changes does not need to be instantaneous, we can
// afford losing a tick or two.
//
// See section 17 of the Java Language Specification for details.
private volatile long time = 0;
private TimerThread() {
super("TimeLimitedCollector timer thread");
this.setDaemon( true );//后台线程 }
@Override
public void run() {
while (true) {
// TODO: Use System.nanoTime() when Lucene moves to Java SE 5.
time += resolution;
try {
Thread.sleep( resolution );
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
public long getMilliseconds() {
return time;
}
}
----------------------------------------------------------------------
@Override
public void collect(final int doc) throws IOException {
long time = TIMER_THREAD.getMilliseconds();
if (timeout < time) {
if (greedy) {
//System.out.println(this+" greedy: before failing, collecting doc: "+(docBase + doc)+" "+(time-t0));
collector.collect(doc);
}
//System.out.println(this+" failing on: "+(docBase + doc)+" "+(time-t0));
throw new TimeExceededException( timeout-t0, time-t0, docBase + doc ); //主动运行时异常退出 }
//System.out.println(this+" collecting: "+(docBase + doc)+" "+(time-t0));
collector.collect(doc);
}
---------------------------------------------------------------------
public static class TimeExceededException extends RuntimeException {//单例 private long timeAllowed;
private long timeElapsed;
private int lastDocCollected;
private TimeExceededException(long timeAllowed, long timeElapsed, int lastDocCollected) {
super("Elapsed time: " + timeElapsed + "Exceeded allowed search time: " + timeAllowed + " ms.");
this.timeAllowed = timeAllowed;
this.timeElapsed = timeElapsed;
this.lastDocCollected = lastDocCollected;
}
public long getTimeAllowed() {
return timeAllowed;
}
public long getTimeElapsed() {
return timeElapsed;
}
public int getLastDocCollected() {
return lastDocCollected;
}
}
--------------------------------------------------------------
group.execute
分两阶段收集。每阶段的收集器不同,但是执行的核心方法都是一样的 searcher.search(query, luceneFilter, secondPhaseCollectors);
createFirstPassCollector createSecondPassCollector封装group的两阶段过程。
第一阶段TermFirstPassGroupingCollector 继承AbstractFirstPassGroupingCollector, 其中collect的实现就是group功能的实现。这里依然遵循整个“查询框架”,就是每个doc的得分是命中的时刻就计算好了 computer-just-at once一次一算,不是事后计算,与全局相关的idf信息是在createWeight的时候就获取的,其他的tf、 norm、boost是具体docId 读取的。也就是docid 和freq是一块获取。(idf是倒排存储,docid、termfreq是正排存储)。而groupcollect做的事情多些而已。包括对fieldvaluecache的依赖(一个域对应一个fieldcache) @Override
public void setNextReader(IndexReader reader, int docBase) throws IOException {
super.setNextReader(reader, docBase);
index = FieldCache.DEFAULT.getStringIndex(reader, groupField);
}
TermAllGroupsCollector extends AbstractAllGroupsCollector<String> collect逻辑做了重写 查询结束、group信息也结束。
searcher.search(result,cmd);
searcher.search的层次清晰,从有cache的开始,然后到没有cache的。
--》 getDocListC(qr,cmd); //queryResultCache 有序得分结果
-->filterCache有序无得分 --》--》 getDocListNC(qr,cmd); // no cache 直接termquery的查询
getDocListNC(qr,cmd)结构也是非常清晰。
timeAllowed、needScore、ProcessedFilter(注意是对filter和filterlist的合并生成一个filter,实际是二者只存其一)
TopDocsCollector的初始化 进入lucene search 接口方法 @Override
public void search(Weight weight, Filter filter, Collector collector)
throws IOException {}
getDocListAndSetNC(qc,cmd);这个与getDocListNC(qc,cmd)结构非常类似
回到lucene search@Override
public void search(Weight weight, Filter filter, Collector collector)
throws IOException {
// TODO: should we make this
// threaded...? the Collector could be sync'd?
// always use single thread:
if (filter == null) {
for (int i = 0; i < subReaders.length; i++) { // search each subreader
collector.setNextReader(subReaders[i], docStarts[i]);//collector与reader绑定
Scorer scorer = weight.scorer(subReaders[i], !collector.acceptsDocsOutOfOrder(), true);
// scorer是weight的score,而weight是query的weight,并且weight的生成过程weight->sumOfSquaredWeights->normalize 不能缺少,否则score=0. createWeight(this)要主要这个this的具体值,如果没有设置自己的similarity就是DefaultSimilarity,也就是DefaultSimilarity的各个得分因子计算,如果有定义自己的similarity,那么这个this一定要传正确。
if (scorer != null) {
scorer.score(collector);//里面while循环收集文档
BooleanQuery-->BooleanWeight-->BooleanScore2 其中subScore对应TermQuery-->TermWeight-->TermScorer
BooleanSocre2 管理BooleanQuery下面的子score,BooleanScore2的makeCountingSumScorer来收集score合并。注意idf是createweight的时候,全局读取。其他参数具体reader读取,这样全局排序参数和单文档排序参数能够分开对待,idf的时候会收集term存在的reader,后面并且子reader执行之前做存在性判断,子reader能够遍历或者串行执行(默认的)。BooleanQuery是得分相加的。
}
}
} else {
for (int i = 0; i < subReaders.length; i++) { // search each subreader
collector.setNextReader(subReaders[i], docStarts[i]);
searchWithFilter(subReaders[i], weight, filter, collector);
}
}
}
public Weight createNormalizedWeight(Query query) throws IOException {
query = rewrite(query);
Weight weight = query.createWeight(this);//第一阶段 float sum = weight.sumOfSquaredWeights();//第二阶段参数初始化
// this is a hack for backwards compatibility:
float norm = query.getSimilarity(this).queryNorm(sum);
if (Float.isInfinite(norm) || Float.isNaN(norm))
norm = 1.0f;
weight.normalize(norm);//第三阶段参数初始化 return weight;
}
BooleanQuery的createWeight 封装query 从而原来的query可以共享 public BooleanWeight(Searcher searcher, boolean disableCoord)
throws IOException {
this.similarity = getSimilarity(searcher);
this.disableCoord = disableCoord;
weights = new ArrayList<Weight>(clauses.size());
for (int i = 0 ; i < clauses.size(); i++) {
BooleanClause c = clauses.get(i);
weights.add(c.getQuery().createWeight(searcher));
if (!c.isProhibited()) maxCoord++;
}
}
BooleanScorer2.score执行循环收集,会总子reader的子score合并收集放置上一层collector里面 @Override
public void score(Collector collector) throws IOException {
collector.setScorer(this);
while ((doc = countingSumScorer.nextDoc()) != NO_MORE_DOCS) {
collector.collect(doc);
}
}
关于collector的合并,可以参考http://www.cnblogs.com/forfuture1978/archive/2010/04/04/1704258.html