2021SC@SDUSC
在 Lucene 中,文档是按添加的顺序编号的,DocumentsWriter 中的 nextDocID 就是记录下一个添加的文档 id。 当 Lucene 支持多线程的时候,就必须要有一个 synchornized 方法来付给文档 id 并且将 nextDocID 加一,这些是在 DocumentsWriter.getThreadState 这个函数里面做的。
虽然给文档付 ID 没有问题了。但是由 Lucene 索引文件格式我们知道,文档是要按照 ID 的顺序从小到大写到索引文件中去的,然而不同的文档处理速度不同,当一个先来的线程一处理一篇需要很长时间的大文档时,另一个后来的线程二可能已经处理了很多小的文档了,但是这些后来小文档的ID号都大于第一个线程所处理的大文档,因而不能马上写到索引文件中去,
而是放到 waitQueue 中,仅仅当大文档处理完了之后才写入索引文件。
waitQueue 中有一个变量 nextWriteDocID 表示下一个可以写入文件的 ID,当付给大文档 ID=4时,则 nextWriteDocID 也设为 4,虽然后来的小文档 5,6,7,8 等都已处理结束,但是如下代码,
WaitQueue.add(){
if (doc.docID == nextWriteDocID){
…………
} else {
waiting[loc] = doc;
waitingBytes += doc.sizeInBytes();
}
doPause()
}
则把 5, 6, 7, 8 放入 waiting 队列,并且记录当前等待的文档所占用的内存大小 waitingBytes。
当大文档 4 处理完毕后,不但写入文档 4,把原来等待的文档 5, 6, 7, 8 也一起写入。
WaitQueue.add(){
if (doc.docID == nextWriteDocID) {
writeDocument(doc);
while(true) {
doc = waiting[nextWriteLoc];
writeDocument(doc);
}
} else {
…………
}
doPause()
}
但是这存在一个问题:当大文档很大很大,处理的很慢很慢的时候,后来的线程二可能已经
处理了很多的小文档了,这些文档都是在 waitQueue 中,则占有了越来越多的内存,长此以
往,有内存不够的危险。
因而在 finishDocuments 里面,在 WaitQueue.add 最后调用了 doPause()函数
DocumentsWriter.finishDocument(){
doPause = waitQueue.add(docWriter);
if (doPause)
waitForWaitQueue();
notifyAll();
}
WaitQueue.doPause() {
return waitingBytes > waitQueuePauseBytes;
}
当 waitingBytes 足够大的时候(为用户指定的内存使用量的 10%),doPause 返回 true,于是后
来的线程二会进入 wait 状态,不再处理另外的文档,而是等待线程一处理大文档结束。
当线程一处理大文档结束的时候,调用 notifyAll 唤醒等待他的线程。
DocumentsWriter.waitForWaitQueue() {
do {
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
} while (!waitQueue.doResume());
}
WaitQueue.doResume() {
return waitingBytes <= waitQueueResumeBytes;
}
当 waitingBytes 足够小的时候,doResume 返回 true, 则线程二不用再 wait 了,可以继续处理另外的文档。
一些标志位
int maxFieldLength:一篇文档中,一个域内可索引的最大的词(Term)数。
int maxBufferedDeleteTerms:可缓存的最大的删除词(Term)数。当大于这个数的时候,就要写到文件中了。
此过程又包含如下过程:
得到当前线程对应的文档集处理对象 得到当前线程对应的文档集处理对象
(DocumentsWriterThreadState)
代码为:
DocumentsWriterThreadState state = getThreadState(doc, delTerm);
在 Lucene 中,对于同一个索引文件夹,只能够有一个 IndexWriter 打开它,在打开后,在文件夹中,生成文件 write.lock,当其他 IndexWriter 再试图打开此索引文件夹的时候,则会报org.apache.lucene.store.LockObtainFailedException 错误。
这样就出现了这样一个问题,在同一个进程中,对同一个索引文件夹,只能有一个 IndexWriter打开它,因而如果想多线程向此索引文件夹中添加文档,则必须共享一个 IndexWriter,而且在以往的实现中,addDocument 函数是同步的(synchronized),也即多线程的索引并不能起到提高性能的效果。
于是为了支持多线程索引,不使 IndexWriter 成为瓶颈,对于每一个线程都有一个相应的文档集处理对象(DocumentsWriterThreadState),这样对文档的索引过程可以多线程并行进行,从而增加索引的速度。
getThreadState 函数是同步的 (synchronized) , DocumentsWriter 有一个成员变量threadBindings,它是一个 HashMap,键为线程对象Thread.currentThread()),值为此线程对应的 DocumentsWriterThreadState 对象。
DocumentsWriterThreadState DocumentsWriter.getThreadState(Document doc, Term delTerm)包含如下几个过程:
根据当前线程对象,从 HashMap 中查找相应的DocumentsWriterThreadState 对象,如果没找到,则生成一个新对象,并添加到 HashMap 中
DocumentsWriterThreadState state = (DocumentsWriterThreadState)
threadBindings.get(Thread.currentThread());
if (state == null) {
……
state = new DocumentsWriterThreadState(this);
……
threadBindings.put(Thread.currentThread(), state);
}
如果此线程对象正在用于处理上一篇文档,则等待,直到此线程的上一篇文档处理完。
DocumentsWriter.getThreadState() {
waitReady(state);
state.isIdle = false;
}
waitReady(state) {
while (!state.isIdle) {wait();}
}
显然如果 state.isIdle 为 false,则此线程等待。
在一篇文档处理之前,state.isIdle = false 会被设定,而在一篇文档处理完毕之后,
DocumentsWriter.finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)
中,会首先设定 perThread.isIdle = true; 然后 notifyAll()来唤醒等待此文档完成的线程,从而处理下一篇文档。
如果 IndexWriter 刚刚 commit 过,则新添加的文档要加入到新的段中(segment),则首先要生成新的段名。
initSegmentName(false);
–> if (segment == null) segment = writer.newSegmentName();
将此线程的文档处理对象设为忙碌:state.isIdle = false;