标签:des Lucene style blog io ar color os sp
前面两节学习了SolrCloud索引过程以及索引链的前两步,LogUpdateProcessorFactory和DistributedUpdateProcessor。本节将详细介绍了索引链的第三步DirectUpdateHandler2和UpdateLog。
DirectUpdateHandler2过程包含了Solr到Lucene的索引过程,在整个索引链中是最复杂也最重要的过程。首先,我们来查看在Solrconfig.xml中关于DirectUpdateHandler2的配置。
1 40 <updateHandler class="solr.DirectUpdateHandler2"> 2 41 3 42 4 43 <autoCommit> 5 44 <maxTime>${solr.autoCommit.maxTime:15000}</maxTime> 6 45 <maxDocs>${solr.autoCommit.maxDocs:25000}</maxDocs> 7 46 <openSearcher>false</openSearcher> 8 47 </autoCommit> 9 48 10 49 <autoSoftCommit> 11 50 <maxTime>${solr.autoSoftCommit.maxTime:-1}</maxTime> 12 51 <maxDocs>${solr.autoSoftCommit.maxDocs:1000}</maxDocs> 13 52 </autoSoftCommit> 14 53 15 54 </updateHandler>
从上面中可以看出几个主要的参数:autoCommit和autoSoftCommit
前面简要介绍了autoCommit和autoSoftCommit,这部分内容网上较多,本文就不具体介绍了。接下来着重介绍DirectUpdateHandler2的流程。
上一节讲到DirectUpdateHandler2是在DistributedUpdateProcessor过程中的versionadd中进行调用。以add过程为例,RunUpdateProcessorFactory.processAdd()
1 public void processAdd(AddUpdateCommand cmd) throws IOException { 2 3 if (DistributedUpdateProcessor.isAtomicUpdate(cmd)) { 4 throw new SolrException 5 (SolrException.ErrorCode.BAD_REQUEST, 6 "RunUpdateProcessor has recieved an AddUpdateCommand containing a document that appears to still contain Atomic document update operations, most likely because DistributedUpdateProcessorFactory was explicitly disabled from this updateRequestProcessorChain"); 7 } 8 9 updateHandler.addDoc(cmd); 10 super.processAdd(cmd); 11 changesSinceCommit = true; 12 }
接着来查看下addDoc0(),该函数包括了DirectUpdateHandler2的add全过程。代码逻辑比较简单,只需要注意以下几点即可:
deletesAfter = ulog.getDBQNewer(cmd.version);获取ulog中delete by query的日志,并对这些数据进行删除。
1 private int addDoc0(AddUpdateCommand cmd) throws IOException { 2 int rc = -1; 3 RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core); 4 try { 5 IndexWriter writer = iw.get(); 6 addCommands.incrementAndGet(); 7 addCommandsCumulative.incrementAndGet(); 8 9 // if there is no ID field, don‘t overwrite 10 if (idField == null) { 11 cmd.overwrite = false; 12 } 13 14 try { 15 IndexSchema schema = cmd.getReq().getSchema(); 16 17 if (cmd.overwrite) { 18 19 // Check for delete by query commands newer (i.e. reordered). This 20 // should always be null on a leader 21 List<UpdateLog.DBQ> deletesAfter = null; 22 if (ulog != null && cmd.version > 0) { 23 deletesAfter = ulog.getDBQNewer(cmd.version); 24 } 25 26 if (deletesAfter != null) { 27 log.info("Reordered DBQs detected. Update=" + cmd + " DBQs=" 28 + deletesAfter); 29 List<Query> dbqList = new ArrayList<>(deletesAfter.size()); 30 for (UpdateLog.DBQ dbq : deletesAfter) { 31 try { 32 DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req); 33 tmpDel.query = dbq.q; 34 tmpDel.version = -dbq.version; 35 dbqList.add(getQuery(tmpDel)); 36 } catch (Exception e) { 37 log.error("Exception parsing reordered query : " + dbq, e); 38 } 39 } 40 41 addAndDelete(cmd, dbqList); 42 } else { 43 // normal update 44 45 Term updateTerm; 46 Term idTerm = new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId()); 47 boolean del = false; 48 if (cmd.updateTerm == null) { 49 updateTerm = idTerm; 50 } else { 51 // this is only used by the dedup update processor 52 del = true; 53 updateTerm = cmd.updateTerm; 54 } 55 56 if (cmd.isBlock()) { 57 writer.updateDocuments(updateTerm, cmd, schema.getAnalyzer()); 58 } else { 59 Document luceneDocument = cmd.getLuceneDocument(); 60 // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer); 61 writer.updateDocument(updateTerm, luceneDocument, schema.getAnalyzer()); 62 } 63 // SolrCore.verbose("updateDocument",updateTerm,"DONE"); 64 65 if (del) { // ensure id remains unique 66 BooleanQuery bq = new BooleanQuery(); 67 bq.add(new BooleanClause(new TermQuery(updateTerm), 68 Occur.MUST_NOT)); 69 bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST)); 70 writer.deleteDocuments(bq); 71 } 72 73 // Add to the transaction log *after* successfully adding to the 74 // index, if there was no error. 75 // This ordering ensures that if we log it, it‘s definitely been 76 // added to the the index. 77 // This also ensures that if a commit sneaks in-between, that we 78 // know everything in a particular 79 // log version was definitely committed. 80 if (ulog != null) ulog.add(cmd); 81 } 82 83 } else { 84 // allow duplicates 85 if (cmd.isBlock()) { 86 writer.addDocuments(cmd, schema.getAnalyzer()); 87 } else { 88 writer.addDocument(cmd.getLuceneDocument(), schema.getAnalyzer()); 89 } 90 91 if (ulog != null) ulog.add(cmd); 92 } 93 94 if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) { 95 if (commitWithinSoftCommit) { 96 commitTracker.addedDocument(-1); 97 softCommitTracker.addedDocument(cmd.commitWithin); 98 } else { 99 softCommitTracker.addedDocument(-1); 100 commitTracker.addedDocument(cmd.commitWithin); 101 } 102 } 103 104 rc = 1; 105 } finally { 106 if (rc != 1) { 107 numErrors.incrementAndGet(); 108 numErrorsCumulative.incrementAndGet(); 109 } else { 110 numDocsPending.incrementAndGet(); 111 } 112 } 113 114 } finally { 115 iw.decref(); 116 } 117 118 return rc; 119 }
UpdateLog的add也比较简单,主要分为三步:
1 <updateLog> 2 <str name="dir">${solr.ulog.dir:}</str> 3 </updateLog>
咋一看会觉得DirectUpdateHandler2的add过程比较简单,但是当add与commit以及updatelog recovering合并在一起,这个过程就变得比较复杂。本节先介绍updatelog的最小单位transactionLog.
1 public static String LOG_FILENAME_PATTERN = "%s.%019d"; 2 String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id);
1 protected void writeLogHeader(LogCodec codec) throws IOException { 2 long pos = fos.size(); 3 assert pos == 0; 4 5 Map header = new LinkedHashMap<String,Object>(); 6 header.put("SOLR_TLOG",1); // a magic string + version number 7 header.put("strings",globalStringList); 8 codec.marshal(header, fos); 9 10 endRecord(pos); 11 }
1 public long write(AddUpdateCommand cmd, int flags) { 2 LogCodec codec = new LogCodec(resolver); 3 SolrInputDocument sdoc = cmd.getSolrInputDocument(); 4 5 try { 6 //写header信息 7 checkWriteHeader(codec, sdoc); 8 9 // adaptive buffer sizing 10 int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine 11 bufSize = Math.min(1024*1024, bufSize+(bufSize>>3)+256); 12 13 MemOutputStream out = new MemOutputStream(new byte[bufSize]); 14 codec.init(out); 15 //写tag 16 codec.writeTag(JavaBinCodec.ARR, 3); 17 //写update类型 18 codec.writeInt(UpdateLog.ADD | flags); // should just take one byte 19 //写version信息 20 codec.writeLong(cmd.getVersion()); 21 //写document 22 codec.writeSolrInputDocument(cmd.getSolrInputDocument()); 23 lastAddSize = (int)out.size(); 24 25 synchronized (this) { 26 long pos = fos.size(); // if we had flushed, this should be equal to channel.position() 27 assert pos != 0; 28 29 /*** 30 System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length()); 31 if (pos != fos.size()) { 32 throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length()); 33 } 34 ***/ 35 36 out.writeAll(fos); 37 endRecord(pos); 38 // fos.flushBuffer(); // flush later 39 return pos; 40 } 41 42 } catch (IOException e) { 43 // TODO: reset our file pointer back to "pos", the start of this record. 44 throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e); 45 } 46 }
1 protected void ensureLog() { 2 if (tlog == null) { 3 String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id); 4 tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings); 5 } 6 }
1 protected void addOldLog(TransactionLog oldLog, boolean removeOld) { 2 if (oldLog == null) return; 3 4 numOldRecords += oldLog.numRecords(); 5 6 int currRecords = numOldRecords; 7 8 if (oldLog != tlog && tlog != null) { 9 currRecords += tlog.numRecords(); 10 } 11 12 while (removeOld && logs.size() > 0) { 13 TransactionLog log = logs.peekLast(); 14 int nrec = log.numRecords(); 15 // remove oldest log if we don‘t need it to keep at least numRecordsToKeep, or if 16 // we already have the limit of 10 log files. 17 if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) { 18 currRecords -= nrec; 19 numOldRecords -= nrec; 20 logs.removeLast().decref(); // dereference so it will be deleted when no longer in use 21 continue; 22 } 23 24 break; 25 } 26 27 // don‘t incref... we are taking ownership from the caller. 28 logs.addFirst(oldLog); 29 }
1 protected void newMap() { 2 prevMap2 = prevMap; 3 prevMapLog2 = prevMapLog; 4 5 prevMap = map; 6 prevMapLog = tlog; 7 8 map = new HashMap<>(); 9 }
总结:本节主要讲了update 索引链的第三步DirectUpdateHandler2中的add过程,add过程主要包含了两步,第一步调用lucene indexwriter 进行updatedocument以及将索引写入updatelog。lucene indexwriter涉及到lucene的建索引了,将在后续文章中再研究。updatelog的难点主要在recovery上,所以本节又简要的介绍了updatelog的基本内容以及具体的日志文件TransactionLog。下一节将介绍update的commit操作,它也主要涉及了updatelog的更新操作。
Solr4.8.0源码分析(16)之SolrCloud索引深入(3)
标签:des Lucene style blog io ar color os sp
原文地址:http://www.cnblogs.com/rcfeng/p/4104669.html