Lucene reader计数
Lucene读索引是reader共享、写索引是indexWriter,二者分离开来的。
对于读索引reader是并发的、资源释放是通过计数来实现的。关键代码如下
参考lucene 3.4:
//定义计数器
private final AtomicInteger refCount = new AtomicInteger();
publicintgetRefCount() {
returnrefCount.get();
}
//访问时候++
publicvoidincRef() {
ensureOpen();
refCount.incrementAndGet();
}
//退出的时候--
publicvoiddecRef()throwsIOException {
ensureOpen();
if(refCount.getAndDecrement() == 1) {
booleansuccess =false;
try{
commit();
doClose();
success =true;
}finally{
if(!success) {
// Put reference back on failure
refCount.incrementAndGet();
}
}
readerFinished();
}
}
//初始化打开reader的时候计数为1
protectedIndexReader() {
refCount.set(1);
}
工作原理:
初始化reader的时候计数为1,包括reopen的时候也是设置1;
通过getSearcher打开SolrIndexReader,这个时候SolrIndexReader里面的具体reader会计数++
getSearcher完毕,会执行close,调用SolrIndexReader里面具体reader计数--
这样在正常情况下,reader总是打开的,因为计数1.只有在SolrCore关闭的时候调用close方法,才会计数--,释放reader资源。
使用注意:
solrCore自身也是计数的,如果solrCore close的时候,没有触发SolrCore真正关闭,那么reader也没关闭,间接导致内存泄露。
这也是之前终搜老版本通过while 强制solrCore decf 来关闭,会导致资源未释放,在某些场景。或者提前释放,导致句柄存在
solrCore 在solr的使用规范是通过CoreContaner获取,每次用完都会close的。我们破坏了这个用法,导致某些场景资源关闭异常。
未来需要定制的、并且需要扩展reader的场景,计数地方大家留个心。
虚拟分组计数
虚拟分组,计数较为复杂些。主要是存在内存reopen、flushreopen、main reopen、merger reopen,
启动时候:reader Open时候++- --> splitReader时候++ --> CoreReader(SolrIndexReader的++反应),也就是说默认时候索引reader计数是3
Ram reopen的时候split层new ,计数++,这个时候core 层不变
Flush时候reopen,open自身++,split层new 计数++,core层不变
切换后 split整个替换,计数回到3
从而close split 、close flush 直接调用close不行,需要多执行2次decf。
索引视图更新的异步reopen
索引视图更新,在solrCore是通过getSearcher间接调用reader的reopen来完成。
当前tsearcher版本都是遵循这个机制。
Searcher的共享使得reader可共享,内存之外的reader不轻易reopen。
实时模式下,reopen ram,而磁盘、主索引的非reopen,使得内存索引可见得到及时完成。
当时reopen是非常消耗资源的,因为需要加载索引的“词典信息”。
异步打开就是首选了。
非虚拟分组的时候,采取的是下面的线程池参数
finalBlockingQueueblockQueue=newArrayBlockingQueue(
2);
finalExecutorServicesearcherExecutor=newThreadPoolExecutor(1, 1,
(long) 300, TimeUnit.SECONDS,blockQueue);
其中参数用的非常巧妙!否则会出现DeckSearcher,导致资源泄露、性能下降
虚拟分组,由于每个分片独立提交并触发reopen。在重启或者add更新量非常巨大瞬间,上面的线程池结构会出现下面的异常。
java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
由于,不能强制关闭已经merger的索引,因为可能有请求在使用,也就不能立即清除已经合并的子索引目录。只好在正在close的时候关闭。
改进异步提交的参数
blockQueue =new LinkedBlockingQueue();
searcherExecutor =newThreadPoolExecutor(1, 1,
(long) 0, TimeUnit.SECONDS,blockQueue);
任务以job形式、串行工作。Job里面,会在searcher打开后执行close资源,然后close里面判断是否清理cache、删除目录等。
内存reopen在虚拟分组场景下是可以合并的。在flush、merge时候需要reopen,内存reopen被合并。
线程池的参数选择以及关闭和释放的资源,直接影响了局部、资源泄露等。再多次尝试后,才稳定下来。
关于线程池参数这部分,这篇博文写的比较简单明了。http://dongxuan.iteye.com/blog/901689
所有BlockingQueue都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
· 如果运行的线程少于corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行)
· 如果运行的线程等于或多于corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
· 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝
· ------------------------------------------------------------------
虚拟分组异步打开源码部分如下。
public voidreopenSearcher(booleanisDisk) {
if(isRamRuning.get() )return;
else{
isRamRuning.set(true) ;
FreshJob4RamReopen job =newFreshJob4RamReopen(solrCore);
job.run();
isRamRuning.set(false);
//searcherExecutor.submit(job);
//blockQueue.add(job);
}
}
public voidreopenSearcher(booleanisDisk,RealTimeIndexReader4VroutExtend toCloseReader,IndexWriter oldRamWriter) {
FreshJob4RamFlush job =newFreshJob4RamFlush(solrCore,isRamRuning);
job.setToCloseReader( toCloseReader );
job.setOldRamWriter( oldRamWriter );
searcherExecutor.submit(job);
//blockQueue.add(job);
}
public voidreopenSearcher(booleanisDisk,List flushMergeReaders) {
FreshJob4Merger job =newFreshJob4Merger(solrCore,isRamRuning);
job.setFlushMergeReaders(flushMergeReaders);
searcherExecutor.submit(job);
//blockQueue.add(job);
}