在ElasticSearch实际使用中遇到了一个问题,就是在数据量很大的情况下做聚合查询(aggregation)会导致内存溢出。当时看了文档,猜测修改search_type
能避免内存溢出。实际测试发现,在数据量相同的情况下,search_type
设置为query_and_fetch
的聚合查询不会导致内存溢出,而默认的query_then_fetch
则会内存溢出。本文就从源码层面分析这两种search_type
的区别。
当一个搜索请求发送到节点之后,节点首先判断出这是一个搜索请求,然后将这个请求传递给TransportSearchAction
。这个类负责处理所有的搜索请求。
TransportSearchAction
负责根据搜索请求中的search_type
将本次的搜索请求传递给对应的类。核心代码如下:
if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
dfsQueryThenFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.QUERY_THEN_FETCH) {
queryThenFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.DFS_QUERY_AND_FETCH) {
dfsQueryAndFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.QUERY_AND_FETCH) {
queryAndFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.SCAN) {
scanAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.COUNT) {
countAction.execute(searchRequest, listener);
}
query_and_fetch
执行过程query_and_fetch
搜索对应的类为TransportSearchQueryAndFetchAction
。它在执行的时候会启动一个异步任务,对应的代码如下:
@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
new AsyncAction(searchRequest, listener).start();
}
AsyncAction
的入口方法是start
,定义在它的基类BaseAsyncAction
中。
BaseAsyncAction
中的start
方法首先确定有哪些分片需要处理,然后给每个需要处理的分片都启动一个异步的搜索任务,对应的关键代码如下:
for (final ShardIterator shardIt : shardsIts) {
...
performFirstPhase(shardIndex, shardIt, shard);
...
}
performFirstPhase
的作用就是生成一个内部的分片搜索请求,这种请求只针对一个分片,然后调用了一个子类的方法sendExecuteFirstPhase
,让子类选择的处理方式。对应的代码如下:
sendExecuteFirstPhase(node, firstRequest, new SearchServiceListener<FirstResult>() {
@Override
public void onResult(FirstResult result) {
onFirstPhaseResult(shardIndex, shard, result, shardIt);
}
...
});
onFirstPhaseResult
主要作用是调用子类的moveToSecondPhase
。这个方法在executeFetchPhase
之后才执行的,因此在其后面再介绍。
sendExecuteFirstPhase
方法是抽象的。在搜索模式为query_and_fetch
时,对分片请求的处理方式是调用sendExecuteFetch
。子类对它的实现代码如下:
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QueryFetchSearchResult> listener) {
searchService.sendExecuteFetch(node, request, listener);
}
sendExecuteFetch
定义在SearchServiceTransportAction
中,它会将分片搜索请求转发到对应的节点上。当然,如果对应的节点是自己这个节点,就不转发了,直接执行executeFetchPhase
。
executeFetchPhase
的执行过程executeFetchPhase
中,首先初始化一个SearchContext
,然后调用queryPhase
,然后调用fetchPhase
,最后清理SearchContext
。对应的关键代码如下:
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException {
// 初始化SearchContext
final SearchContext context = createAndPutContext(request);
...
try {
// 执行queryPhase
...
queryPhase.execute(context);
...
// 执行fetchPhase
...
fetchPhase.execute(context);
...
if (context.scroll() == null) {
freeContext(context.id());
}
...
// 返回结果
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) {
...
} finally {
// 清理SearchContext
cleanContext(context);
}
}
从上面的代码可以看出,queryPhase
和fetchPhase
是连续执行的,这就是query_and_fetch
的含义。当这两个阶段执行完成之后,如果不是滚动查询,就直接调用freeContext
将SearchContext
从activeContexts
中删除。
freeContext
和clearContext
有什么区别呢?
freeContext
的主要作用是从activeContexts
中删掉指定的SearchContext
,这样JVM就能将这部分的内存进行回收了。相关的代码如下:
public boolean freeContext(long id) {
final SearchContext context = activeContexts.remove(id);
...
context.close();
...
}
cleanContext
的主要作用是释放部分内存空间,然后将指定的context从ThreadLocal
中删掉。相关的代码如下:
private void cleanContext(SearchContext context) {
context.clearReleasables(Lifetime.PHASE);
SearchContext.removeCurrent();
}
SearchContext.clearReleasables
的主要作用是将可以释放的内存释放掉。那怎样判断哪些资源是可以释放的呢?这个需要别的类在里面进行注册了。注册的时候调用addReleasable
,告知哪个阶段可以清理哪些资源。相关的代码如下:
private Multimap<Lifetime, Releasable> clearables = null;
public void addReleasable(Releasable releasable, Lifetime lifetime) {
...
clearables.put(lifetime, releasable);
}
public void clearReleasables(Lifetime lifetime) {
...
List<Collection<Releasable>> releasables = new ArrayList<>();
for (Lifetime lc : Lifetime.values()) {
if (lc.compareTo(lifetime) > 0) {
break;
}
releasables.add(clearables.removeAll(lc));
}
Releasables.close(Iterables.concat(releasables));
...
}
上面这段代码中的for循环是为了收集指定阶段,以及该阶段之前所有可清理的资源。因此clearReleasables
的作用是清理掉指定阶段以及该阶段之前的所有可清理的资源。
SearchContext.removeCurrent
代码如下。主要就是将当前的SearchContext
删除掉。
private static ThreadLocal<SearchContext> current = new ThreadLocal<>();
public static void removeCurrent() {
current.remove();
QueryParseContext.removeTypes();
}
分析一下executeFetchPhase
返回结果。从代码中可以看出,返回的结果中包含query的结果和fetch的结果。对应的两个类分别是QuerySearchResult
和FetchSearchResult
。
QuerySearchResult
QuerySearchResult
定义如下:
public class QuerySearchResult extends TransportResponse implements QuerySearchResultProvider {
private long id;
private SearchShardTarget shardTarget;
private int from;
private int size;
private TopDocs topDocs;
private InternalFacets facets;
private InternalAggregations aggregations;
private Suggest suggest;
private boolean searchTimedOut;
...
}
其中最重要的应该是topDocs和aggregations。
再看看TopDocs
的定义:
public class TopDocs {
public int totalHits;
public ScoreDoc[] scoreDocs;
private float maxScore;
...
}
里面包含了搜索命中的结果数量、文档编号、文档匹配分数、最大匹配分数。再看看ScoreDoc
是如何定义的。
public class ScoreDoc {
public float score;
public int doc;
public int shardIndex;
...
}
这里的doc
就是内部的文档编号,可以通过IndexSearcher#doc(int)
方法获取对应的文档内容。
因此QuerySearchResult
中只包含了内部的文档编号、文档的匹配分值。
FetchSearchResult
FetchSearchResult
的定义如下:
public class FetchSearchResult extends TransportResponse implements FetchSearchResultProvider {
private long id;
private SearchShardTarget shardTarget;
private InternalSearchHits hits;
private transient int counter;
...
}
它包含了InternalSearchHits
。InternalSearchHits
的定义如下:
public class InternalSearchHits implements SearchHits {
private InternalSearchHit[] hits;
public long totalHits;
private float maxScore;
...
}
它包含了InternalSearchHit
。InternalSearchHit
的定义如下:
public class InternalSearchHit implements SearchHit {
...
private Map<String, Object> sourceAsMap;
private byte[] sourceAsBytes;
...
}
它包含了文档的原始内容和解析后的内容。
因此FetchSearchResult
包含了文档的具体内容。
moveToSecondPhase
执行过程在上面的executeFetchPhase
执行完成之后,得到query结果和fetch结果之后,就执行moveToSecondPhase
了。
moveToSecondPhase
在BaseAsyncAction
中是一个抽象方法,它在子类TransportSearchQueryAndFetchAction
中的定义如下:
@Override
protected void moveToSecondPhase() throws Exception {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
...
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
...
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
});
}
从代码中可以看出,搜索的第二阶段是在search线程池中提交一个任务,首先是对分片结果进行整体排序,然后将搜索结果进行合并。这里面分别调用了searchPhaseController.sortDocs
和searchPhaseController.merge
两个方法。
首先看看sortDocs
做了什么。它的关键代码如下:
public ScoreDoc[] sortDocs(boolean scrollSort, AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException {
...
TopDocs mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
return mergedTopDocs.scoreDocs;
}
从代码中可以看出,对文档进行排序的时候调用了TopDocs.merge
方法。这个方法的关键代码如下:
public static TopDocs merge(Sort sort, int start, int size, TopDocs[] shardHits) throws IOException {
final PriorityQueue<ShardRef> queue;
...
queue = new ScoreMergeSortQueue(shardHits);
...
for(int shardIDX=0;shardIDX<shardHits.length;shardIDX++) {
...
queue.add(new ShardRef(shardIDX));
...
}
...
// hits储存最终的结果
hits = new ScoreDoc[Math.min(size, availHitCount - start)];
// 每次取出最小值放到结果中
while (hitUpto < numIterOnHits) {
...
// 利用优先级队列,从所有分片中取出分值最小的文档,加入到hits结果中
ShardRef ref = queue.pop();
final ScoreDoc hit = shardHits[ref.shardIndex].scoreDocs[ref.hitIndex++];
hit.shardIndex = ref.shardIndex;
...
hits[hitUpto - start] = hit;
...
hitUpto++;
// 数组中的数据读取完了,就从优先级队列中排除
if (ref.hitIndex < shardHits[ref.shardIndex].scoreDocs.length) {
queue.add(ref);
}
}
}
这段代码刚开始看起来有点复杂,看不太懂。其实它是归并排序的变种。因为普通的归并排序只针对两个数组。排序的时候每次从输入的两个数组中取出最小的元素,放到结果中。而这里输入的数组会有多个,每次也要取出最小的元素,放到结果中。如何快速的从多个数组中取出最小的元素呢?这里利用了优先级队列。
再看searchPhaseController.merge
做了什么呢?主要是合并facet结果、aggregation结果、hits结果、suggest结果、count结果。这里就看一下最关键的hits是怎样合并的。相关的代码如下:
List<InternalSearchHit> hits = new ArrayList<>();
for (ScoreDoc shardDoc : sortedDocs) {
FetchSearchResult fetchResult = ...;
...
int index = fetchResult.counterGetAndIncrement();
if (index < fetchResult.hits().internalHits().length) {
...
InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
...
hits.add(searchHit);
}
...
}
hit的合并过程就是将fetchResult
中所有文档的具体内容组合成一个新的列表。其他的facet、aggregations、suggest、count的合并过程也是类似。
query_and_fetch
下的from
和size
举个例子,如果在搜索的时候指定search_type
为query_and_fetch
,再指定size
为10,那么就会返回50个结果。这是为什么呢?原来,在fetch的时候就已经把from
和size
参数用掉了,导致每个分片都返回了10个文档,如果有5个分片的话,那么最后合并的结果就是50个。
如果query_and_fetch
情况下这两和参数的作用和query_then_fetch
的行为一样。那么一次就要取出from
+size
个文档。如果from
比较大,那么取出的文档足以撑爆内存。而如果在fetch阶段就直接使用这两个参数,每个分片最多就取出size
个文档,from
稍微大一些也没有关系。因此,这种行为的设计也是可以理解的。
query_then_fetch
执行过程query_then_fetch
的搜索逻辑在TransportSearchQueryThenFetchAction
中。它在执行的时候会启动一个异步任务。相关代码如下:
@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
new AsyncAction(searchRequest, listener).start();
}
start
方法定义在它的基类BaseAsyncTask
中,主要作用是给每个需要搜索的分片单独启动一个异步任务,并让子类选择处理方式。在第一步处理完成之后,会调用子类中的moveToSecondPhase
继续执行第二阶段的计算任务。在query_then_fetch
中,子类的处理方式是sendExecuteQuery
。相关的代码如下:
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
searchService.sendExecuteQuery(node, request, listener);
}
sendExecuteQuery
的作用是什么呢?它会将query请求发送到对应的节点上。如果对应的节点就是自己,那么就直接调用executeQueryPhase
,执行query任务。
executeQueryPhase
的主要作用是初始化SearchContext
,然后执行queryPhase
,最后清理SearchContext
。返回的结果是QuerySearchResult
,它只包含文档编号和必要的排序分值。相关的代码如下:
public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
final SearchContext context = createAndPutContext(request);
try {
...
queryPhase.execute(context);
...
if (context.searchType() == SearchType.COUNT) {
freeContext(context.id());
}
...
return context.queryResult();
} catch(...) {
...
} finally {
...
cleanContext(context);
}
}
代码里面可以看到一个细节,就是只有在search_type
为count
的时候才会调用freeContext
。也就是说,如果search_type
不是count
,那么SearchContext
仍然会驻留在内存中。cleanContext
和freeContext
的区别在前面的文章里面讲过了。
当queryPhase
结束之后,就开始第二阶段了。第二阶段从moveToSecondPhase
开始。它的代码定义在TransportSearchQueryThenFetchAction
中。主要的作用是将第一阶段获取的文档编号进行排序。排序完成之后再根据文档编号获取文档里面实际的内容。相关的代码如下:
@Override
protected void moveToSecondPhase() throws Exception {
...
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
...
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = firstResults.get(entry.index);
...
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
sortDocs
前面已经讲过了,作用是将文档编号根据每个文档的匹配分值进行排序。
executeFetch
相关的代码如下。它的作用是调用searchService
开启一个异步任务,根据文档编号获取文档的具体内容,并将结果存放到fetchResults
中。根据counter判断如果所有的fetch任务都执行完了,就调用finishHim
来完成本次查询结果。
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
...
fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
...
});
}
sendExecuteFetch
最后会调用executeFetchPhase
。它的关键代码如下:
public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {
final SearchContext context = findContext(request.id());
...
fetchPhase.execute(context);
...
freeContext(request.id());
...
return context.fetchResult();
}
从代码中可以看出,在fetch阶段结束之后才会释放SearchContext
。
finishHim
相关的代码如下。它的作用是合并每个分片的查询结果,让后将合并结果通知给listener。让它完成最后的查询结果。searchPhaseController.merge
在前面讲过了,主要作用是
private void finishHim() {
...
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
...
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
...
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
...
}
});
...
}
query_and_fetch
和query_then_fetch
下的聚合查询有什么区别呢?首先,前面已经讲过了,query_and_fetch
在executeFetchPhase
的时候就把SearchContext
释放掉了。
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException {
final SearchContext context = createAndPutContext(request);
...
queryPhase.execute(context);
...
fetchPhase.execute(context);
...
freeContext(context.id());
...
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
而在query_then_fetch
中,query阶段不会释放SearchContext
,在fetch阶段结束之后才会释放SearchContext
。两段相关的代码如下:
public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
final SearchContext context = createAndPutContext(request);
...
queryPhase.execute(context);
...
return context.queryResult();
}
public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {
final SearchContext context = findContext(request.id());
...
fetchPhase.execute(context);
...
freeContext(request.id());
...
return context.fetchResult();
}
在聚类搜索的时候,内存占用比较大的是FieldData
数据,这些数据储存在SearchContext
中。在query_then_fetch
的搜索模式下,载入FieldData是在query阶段完成的。而query阶段不会释放内存,因此内存中会存放所有分片的FieldData,从而导致内存溢出。
但是我还是有个疑问,为什么不在query阶段结束之后立即释放SearchContext中的FieldData呢?这样也就不会有内存溢出的问题了。
ElasticSearch:剖析query_and_fetch和query_then_fetch的区别
原文地址:http://blog.csdn.net/caipeichao2/article/details/46418413