lucene核心代码分析10

2021SC@SDUSC
在 Lucene 中,文档是按添加的顺序编号的,DocumentsWriter 中的 nextDocID 就是记录下一个添加的文档 id。 当 Lucene 支持多线程的时候,就必须要有一个 synchornized 方法来付给文档 id 并且将 nextDocID 加一,这些是在 DocumentsWriter.getThreadState 这个函数里面做的。
虽然给文档付 ID 没有问题了。但是由 Lucene 索引文件格式我们知道,文档是要按照 ID 的顺序从小到大写到索引文件中去的,然而不同的文档处理速度不同,当一个先来的线程一处理一篇需要很长时间的大文档时,另一个后来的线程二可能已经处理了很多小的文档了,但是这些后来小文档的ID号都大于第一个线程所处理的大文档,因而不能马上写到索引文件中去,
而是放到 waitQueue 中,仅仅当大文档处理完了之后才写入索引文件。
waitQueue 中有一个变量 nextWriteDocID 表示下一个可以写入文件的 ID,当付给大文档 ID=4时,则 nextWriteDocID 也设为 4,虽然后来的小文档 5,6,7,8 等都已处理结束,但是如下代码,

WaitQueue.add(){ 
 if (doc.docID == nextWriteDocID){ 
 ………… 
 } else { 
 waiting[loc] = doc; 
 waitingBytes += doc.sizeInBytes(); 
 } 
 doPause() 
} 

则把 5, 6, 7, 8 放入 waiting 队列,并且记录当前等待的文档所占用的内存大小 waitingBytes。
当大文档 4 处理完毕后,不但写入文档 4,把原来等待的文档 5, 6, 7, 8 也一起写入。

WaitQueue.add(){ 
 if (doc.docID == nextWriteDocID) { 
 writeDocument(doc); 
 while(true) { 
 doc = waiting[nextWriteLoc]; 
 writeDocument(doc); 
 } 
 } else { 
 ………… 
 } 
 doPause() 
} 

但是这存在一个问题:当大文档很大很大,处理的很慢很慢的时候,后来的线程二可能已经
处理了很多的小文档了,这些文档都是在 waitQueue 中,则占有了越来越多的内存,长此以
往,有内存不够的危险。
因而在 finishDocuments 里面,在 WaitQueue.add 最后调用了 doPause()函数

DocumentsWriter.finishDocument(){ 
 doPause = waitQueue.add(docWriter); 
 if (doPause) 
 waitForWaitQueue(); 
 notifyAll(); 
} 
WaitQueue.doPause() { 
 return waitingBytes > waitQueuePauseBytes; 
} 

当 waitingBytes 足够大的时候(为用户指定的内存使用量的 10%),doPause 返回 true,于是后
来的线程二会进入 wait 状态,不再处理另外的文档,而是等待线程一处理大文档结束。
当线程一处理大文档结束的时候,调用 notifyAll 唤醒等待他的线程。

DocumentsWriter.waitForWaitQueue() { 
 do { 
 try { 
 wait(); 
 } catch (InterruptedException ie) { 
 throw new ThreadInterruptedException(ie); 
 } 
 } while (!waitQueue.doResume()); 
} 
WaitQueue.doResume() { 
 return waitingBytes <= waitQueueResumeBytes; 
} 

当 waitingBytes 足够小的时候,doResume 返回 true, 则线程二不用再 wait 了,可以继续处理另外的文档。
一些标志位
int maxFieldLength:一篇文档中,一个域内可索引的最大的词(Term)数。
int maxBufferedDeleteTerms:可缓存的最大的删除词(Term)数。当大于这个数的时候,就要写到文件中了。

此过程又包含如下过程:
得到当前线程对应的文档集处理对象 得到当前线程对应的文档集处理对象
(DocumentsWriterThreadState)
代码为:

DocumentsWriterThreadState state = getThreadState(doc, delTerm); 

在 Lucene 中,对于同一个索引文件夹,只能够有一个 IndexWriter 打开它,在打开后,在文件夹中,生成文件 write.lock,当其他 IndexWriter 再试图打开此索引文件夹的时候,则会报org.apache.lucene.store.LockObtainFailedException 错误。
这样就出现了这样一个问题,在同一个进程中,对同一个索引文件夹,只能有一个 IndexWriter打开它,因而如果想多线程向此索引文件夹中添加文档,则必须共享一个 IndexWriter,而且在以往的实现中,addDocument 函数是同步的(synchronized),也即多线程的索引并不能起到提高性能的效果。
于是为了支持多线程索引,不使 IndexWriter 成为瓶颈,对于每一个线程都有一个相应的文档集处理对象(DocumentsWriterThreadState),这样对文档的索引过程可以多线程并行进行,从而增加索引的速度。
getThreadState 函数是同步的 (synchronized) , DocumentsWriter 有一个成员变量threadBindings,它是一个 HashMap,键为线程对象Thread.currentThread()),值为此线程对应的 DocumentsWriterThreadState 对象。

DocumentsWriterThreadState DocumentsWriter.getThreadState(Document doc, Term delTerm)包含如下几个过程:
根据当前线程对象,从 HashMap 中查找相应的DocumentsWriterThreadState 对象,如果没找到,则生成一个新对象,并添加到 HashMap 中

DocumentsWriterThreadState state = (DocumentsWriterThreadState) 
threadBindings.get(Thread.currentThread()); 
if (state == null) { 
 …… 
 state = new DocumentsWriterThreadState(this); 
 …… 
 threadBindings.put(Thread.currentThread(), state); 
} 

如果此线程对象正在用于处理上一篇文档,则等待,直到此线程的上一篇文档处理完。

DocumentsWriter.getThreadState() { 
 waitReady(state); 
 state.isIdle = false; 
} 
waitReady(state) { 
 while (!state.isIdle) {wait();} 
} 

显然如果 state.isIdle 为 false,则此线程等待。
在一篇文档处理之前,state.isIdle = false 会被设定,而在一篇文档处理完毕之后,

DocumentsWriter.finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)

中,会首先设定 perThread.isIdle = true; 然后 notifyAll()来唤醒等待此文档完成的线程,从而处理下一篇文档。
如果 IndexWriter 刚刚 commit 过,则新添加的文档要加入到新的段中(segment),则首先要生成新的段名。
initSegmentName(false);
–> if (segment == null) segment = writer.newSegmentName();

将此线程的文档处理对象设为忙碌:state.isIdle = false;

上一篇:WorkManager 在多进程应用中的高级用法


下一篇:计算机毕业设计之 [含开题报告+论文+源码等]基于Lucene全文检索框架实现的博客管理系统