标签:
ucene将文档加入DocumentsWriter代码调用层次结构如下:
DocumentsWriter.updateDocument(Document doc, Analyzer analyzer, Term delTerm)
-->(1) DocumentsWriterThreadState state = getThreadState(doc, delTerm);
-->(2) DocWriter perDoc = state.consumer.processDocument();
-->(3) finishDocument(state, perDoc);
DocumentsWriter对象主要包含以下几部分:
* 用于写索引文件
IndexWriter writer;
Directory directory;
Similarity similarity:分词器
String segment:当前的段名,每当flush的时候,将索引写入以此为名称的段。
IndexWriter.doFlushInternal()
--> String segment = docWriter.getSegment();//return segment
--> newSegment = new SegmentInfo(segment,……);
--> docWriter.createCompoundFile(segment);//根据segment创建cfs文件。
String docStoreSegment:存储域所要写入的目标段。(在索引文件格式一文中已经详细描述)
int docStoreOffset:存储域在目标段中的偏移量。
int nextDocID:下一篇添加到此索引的文档ID号,对于同一个索引文件夹,此变量唯一,且同步访问。
DocConsumer consumer; 这是整个索引过程的核心,是IndexChain整个索引链的源头。
基本索引链:
对于一篇文档的索引过程,不是由一个对象来完成的,而是用对象组合的方式形成的一个处理链,链上的每个对象仅
理索引过程的一部分,称为索引链,由于后面还有其他的索引链,所以此处的索引链我称为基本索引链。
DocConsumer consumer 类型为DocFieldProcessor,是整个索引链的源头,包含如下部分:
* 对索引域的处理
DocFieldConsumer consumer 类型为DocInverter,包含如下部分
InvertedDocConsumer consumer类型为TermsHash,包含如下部分
TermsHashConsumer consumer类型为FreqProxTermsWriter,负责写freq, pro
TermsHash nextTermsHash
TermsHashConsumer consumer类型为TermVectorsTermsWriter,负责tvd, tvf信息
InvertedDocEndConsumer endConsumer 类型为NormsWriter,负责写nrm信息
* 对存储域的处理
FieldInfos fieldInfos = new FieldInfos();
StoredFieldsWriter fieldsWriter负责写fnm, fdt, fdx信息
* 删除文档
BufferedDeletes deletesInRAM = new BufferedDeletes();
BufferedDeletes deletesFlushed = new BufferedDeletes();
类BufferedDeletes包含了一下的成员变量:
* HashMap terms = new HashMap();删除的词(Term)
* HashMap queries = new HashMap();删除的查询(Query)
* List docIDs = new ArrayList();删除的文档ID
* long bytesUsed:用于判断是否应该对删除的文档写入索引文件。
由此可见,文档的删除主要有三种方式:
* IndexWriter.deleteDocuments(Term term):所有包含此词的文档都会被删除。
* IndexWriter.deleteDocuments(Query query):所有能满足此查询的文档都会被删除。
* IndexReader.deleteDocument(int docNum):删除此文档ID
删除文档既可以用reader进行删除,也可以用writer进行删除,不同的是,reader进行删除后,此reader马上能够生
而用writer删除后,会被缓存在deletesInRAM及deletesFlushed中,只有写入到索引文件中,当reader再次打开的
才能够看到。
那deletesInRAM和deletesFlushed各有什么用处呢?
此版本的Lucene对文档的删除是支持多线程的,当用IndexWriter删除文档的时候,都是缓存在deletesInRAM中
flush,才将删除的文档写入到索引文件中去,我们知道flush是需要一段时间的,那么在flush的过程中,另一个线
档删除怎么办呢?
一般过程是这个样子的,当flush的时候,首先在同步(synchornized)的方法pushDeletes中,将deletesInRAM全部
deletesFlushed中,然后将deletesInRAM清空,退出同步方法,于是flush的线程就向索引文件写deletesFlushed
删除文档的过程,而与此同时其他线程新删除的文档则添加到新的deletesInRAM中去,直到下次flush才写入索引文件
* 缓存管理
为了提高索引的速度,Lucene对很多的数据进行了缓存,使一起写入磁盘,然而缓存需要管理,何时分配,何时回收,何时写入磁盘都需要考虑。
ArrayList freeCharBlocks = new ArrayList();将用于缓存词(Term)信息的空闲块
ArrayList freeByteBlocks = new ArrayList();将用于缓存文档号(doc id)及词频(freq),位(prox)信息的空闲块。
ArrayList freeIntBlocks = new ArrayList();将存储某词的词频(freq)和位置(prox)分别在
byteBlocks中的偏移量
boolean bufferIsFull;用来判断缓存是否满了,如果满了,则应该写入磁盘
long numBytesAlloc;分配的内存数量
long numBytesUsed;使用的内存数量
long freeTrigger;应该开始回收内存时的内存用量。
long freeLevel;回收内存应该回收到的内存用量。
long ramBufferSize;用户设定的内存用量。
缓存用量之间的关系如下:
DocumentsWriter.setRAMBufferSizeMB(double mb){
ramBufferSize = (long) (mb*1024*1024);//用户设定的内存用量,当使用内存大于此时,开始写入磁盘
waitQueuePauseBytes = (long) (ramBufferSize*0.1);
waitQueueResumeBytes = (long) (ramBufferSize*0.05);
freeTrigger = (long) (1.05 * ramBufferSize);//当分配的内存到达105%的时候开始释放freeBlocks中的内存
freeLevel = (long) (0.95 * ramBufferSize);//一直释放到95%
}
DocumentsWriter.balanceRAM(){
if (numBytesAlloc+deletesRAMUsed > freeTrigger) {
//当分配的内存加删除文档所占用的内存大于105%的时候,开始释放内存
while(numBytesAlloc+deletesRAMUsed > freeLevel) {
//一直进行释放,直到95%
//释放free blocks
byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
numBytesAlloc -= BYTE_BLOCK_SIZE;
freeCharBlocks.remove(freeCharBlocks.size()-1);
numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
freeIntBlocks.remove(freeIntBlocks.size()-1);
numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;
}
} else {
if (numBytesUsed+deletesRAMUsed > ramBufferSize){
//当使用的内存加删除文档占有的内存大于用户指定的内存时,可以写入磁盘
bufferIsFull = true;
}
}
}
当判断是否应该写入磁盘时:
* 如果使用的内存大于用户指定内存时,bufferIsFull = true
* 当使用的内存加删除文档所占的内存加正在写入的删除文档所占的内存大于用户指定内存时
deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize
* 当删除的文档数目大于maxBufferedDeleteTerms时
DocumentsWriter.timeToFlushDeletes(){
return (bufferIsFull || deletesFull()) && setFlushPending();
}
DocumentsWriter.deletesFull(){
return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH &&
(deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) ||
(maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH &&
((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms));
}
多线程并发索引
为了支持多线程并发索引,对每一个线程都有一个DocumentsWriterThreadState,其为每一
个线程根据DocConsumer consumer的索引链来创建每个线程的索引链(XXXPerThread),来
进行对文档的并发处理。
DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
HashMap threadBindings = new HashMap();
虽然对文档的处理过程可以并行,但是将文档写入索引文件却必须串行进行,串行写入的代码
在DocumentsWriter.finishDocument中
WaitQueue waitQueue = new WaitQueue()
long waitQueuePauseBytes
long waitQueueResumeBytes
在Lucene中,文档是按添加的顺序编号的,DocumentsWriter中的nextDocID就是记录下一个添加的文档id。 当Lucene支持
持多线程的时候,就必须要有一个synchornized方法来付给文档id并且将nextDocID加一,这些是在DocumentsWriter.getThreadState这个函数里面做的。
虽然给文档付ID没有问题了。但是由Lucene索引文件格式我们知道,文档是要按照ID的顺序从小到大写到索引文件
然而不同的文档处理速度不同,当一个先来的线程一处理一篇需要很长时间的大文档时,另一个后来的线程二可能已
了很多小的文档了,但是这些后来小文档的ID号都大于第一个线程所处理的大文档,因而不能马上写到索引文件中去
而是放到waitQueue中,仅仅当大文档处理完了之后才写入索引文件。
waitQueue中有一个变量nextWriteDocID表示下一个可以写入文件的ID,当付给大文档ID=4时,则nextWriteDocID
为4,虽然后来的小文档5,6,7,8等都已处理结束,但是如下代码,
WaitQueue.add(){
if (doc.docID == nextWriteDocID){
…………
} else {
waiting[loc] = doc;
waitingBytes += doc.sizeInBytes();
}
doPause()
}
但是这存在一个问题:当大文档很大很大,处理的很慢很慢的时候,后来的线程二可能已经处理了很多的小文档了
档都是在waitQueue中,则占有了越来越多的内存,长此以往,有内存不够的危险。
因而在finishDocuments里面,在WaitQueue.add最后调用了doPause()函数
DocumentsWriter.finishDocument(){
doPause = waitQueue.add(docWriter);
if (doPause)
waitForWaitQueue();
notifyAll();//lucene.net采用System.Threading.Monitor.PulseAll(this)通知每个线程
}
WaitQueue.doPause() {
return waitingBytes > waitQueuePauseBytes;
}
当waitingBytes足够大的时候(为用户指定的内存使用量的10%),doPause返回true,于是后来的线程二会进入wait
不再处理另外的文档,而是等待线程一处理大文档结束。
当线程一处理大文档结束的时候,调用notifyAll唤醒等待他的线程。
DocumentsWriter.waitForWaitQueue() {
do {
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
} while (!waitQueue.doResume());
}
WaitQueue.doResume() {
return waitingBytes <= waitQueueResumeBytes;
}
当waitingBytes足够小的时候,doResume返回true, 则线程二不用再wait了,可以继续处理另外的文档。
• 一些标志位
? int maxFieldLength:一篇文档中,一个域内可索引的最大的词(Term)数。
? int maxBufferedDeleteTerms:可缓存的最大的删除词(Term)数。当大于这个数的时候,就要
写到文件中了。
此过程又包含如下三个子过程:
1、得到当前线程对应的文档集处理对象(DocumentsWriterThreadState)
2、用得到的文档集处理对象(DocumentsWriterThreadState)处理文档
3、用DocumentsWriter.finishDocument结束本次文档添加
标签:
原文地址:http://www.cnblogs.com/mggwct/p/4876958.html