码迷,mamicode.com
首页 > 其他好文 > 详细

ElasticSearch:剖析query_and_fetch和query_then_fetch的区别

时间:2015-06-08 23:26:46      阅读:345      评论:0      收藏:0      [点我收藏+]

标签:搜索引擎   elasticsearch   内存溢出   

在ElasticSearch实际使用中遇到了一个问题,就是在数据量很大的情况下做聚合查询(aggregation)会导致内存溢出。当时看了文档,猜测修改search_type能避免内存溢出。实际测试发现,在数据量相同的情况下,search_type设置为query_and_fetch的聚合查询不会导致内存溢出,而默认的query_then_fetch则会内存溢出。本文就从源码层面分析这两种search_type的区别。

搜索请求处理过程

当一个搜索请求发送到节点之后,节点首先判断出这是一个搜索请求,然后将这个请求传递给TransportSearchAction。这个类负责处理所有的搜索请求。

TransportSearchAction负责根据搜索请求中的search_type将本次的搜索请求传递给对应的类。核心代码如下:

if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
    dfsQueryThenFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.QUERY_THEN_FETCH) {
    queryThenFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.DFS_QUERY_AND_FETCH) {
    dfsQueryAndFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.QUERY_AND_FETCH) {
    queryAndFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.SCAN) {
    scanAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.COUNT) {
    countAction.execute(searchRequest, listener);
}

query_and_fetch执行过程

query_and_fetch搜索对应的类为TransportSearchQueryAndFetchAction。它在执行的时候会启动一个异步任务,对应的代码如下:

@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
    new AsyncAction(searchRequest, listener).start();
}

AsyncAction的入口方法是start,定义在它的基类BaseAsyncAction中。

BaseAsyncAction中的start方法首先确定有哪些分片需要处理,然后给每个需要处理的分片都启动一个异步的搜索任务,对应的关键代码如下:

for (final ShardIterator shardIt : shardsIts) {
    ...
    performFirstPhase(shardIndex, shardIt, shard);
    ...
}

performFirstPhase的作用就是生成一个内部的分片搜索请求,这种请求只针对一个分片,然后调用了一个子类的方法sendExecuteFirstPhase,让子类选择的处理方式。对应的代码如下:

sendExecuteFirstPhase(node, firstRequest, new SearchServiceListener<FirstResult>() {
    @Override
    public void onResult(FirstResult result) {
        onFirstPhaseResult(shardIndex, shard, result, shardIt);
    }
    ...
});

onFirstPhaseResult主要作用是调用子类的moveToSecondPhase。这个方法在executeFetchPhase之后才执行的,因此在其后面再介绍。

sendExecuteFirstPhase方法是抽象的。在搜索模式为query_and_fetch时,对分片请求的处理方式是调用sendExecuteFetch。子类对它的实现代码如下:

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QueryFetchSearchResult> listener) {
    searchService.sendExecuteFetch(node, request, listener);
}

sendExecuteFetch定义在SearchServiceTransportAction中,它会将分片搜索请求转发到对应的节点上。当然,如果对应的节点是自己这个节点,就不转发了,直接执行executeFetchPhase

executeFetchPhase的执行过程

executeFetchPhase中,首先初始化一个SearchContext,然后调用queryPhase,然后调用fetchPhase,最后清理SearchContext。对应的关键代码如下:

public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException {
    // 初始化SearchContext
    final SearchContext context = createAndPutContext(request);
    ...

    try {
        // 执行queryPhase
        ...
        queryPhase.execute(context);
        ...

        // 执行fetchPhase
        ...
        fetchPhase.execute(context);
        ...

        if (context.scroll() == null) {
           freeContext(context.id());
       }

       ...

        // 返回结果
        return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
    } catch (Throwable e) {
        ...
    } finally {
        // 清理SearchContext
        cleanContext(context);
    }
}

从上面的代码可以看出,queryPhasefetchPhase是连续执行的,这就是query_and_fetch的含义。当这两个阶段执行完成之后,如果不是滚动查询,就直接调用freeContextSearchContextactiveContexts中删除。

freeContextclearContext有什么区别呢?

freeContext的主要作用是从activeContexts中删掉指定的SearchContext,这样JVM就能将这部分的内存进行回收了。相关的代码如下:

public boolean freeContext(long id) {
    final SearchContext context = activeContexts.remove(id);
    ...
    context.close();
    ...
}

cleanContext的主要作用是释放部分内存空间,然后将指定的context从ThreadLocal中删掉。相关的代码如下:

private void cleanContext(SearchContext context) {
    context.clearReleasables(Lifetime.PHASE);
    SearchContext.removeCurrent();
}

SearchContext.clearReleasables的主要作用是将可以释放的内存释放掉。那怎样判断哪些资源是可以释放的呢?这个需要别的类在里面进行注册了。注册的时候调用addReleasable,告知哪个阶段可以清理哪些资源。相关的代码如下:

private Multimap<Lifetime, Releasable> clearables = null;
public void addReleasable(Releasable releasable, Lifetime lifetime) {
    ...
    clearables.put(lifetime, releasable);
}

public void clearReleasables(Lifetime lifetime) {
    ...
    List<Collection<Releasable>> releasables = new ArrayList<>();
    for (Lifetime lc : Lifetime.values()) {
        if (lc.compareTo(lifetime) > 0) {
            break;
        }
        releasables.add(clearables.removeAll(lc));
    }
    Releasables.close(Iterables.concat(releasables));
    ...
}

上面这段代码中的for循环是为了收集指定阶段,以及该阶段之前所有可清理的资源。因此clearReleasables的作用是清理掉指定阶段以及该阶段之前的所有可清理的资源。

SearchContext.removeCurrent代码如下。主要就是将当前的SearchContext删除掉。

private static ThreadLocal<SearchContext> current = new ThreadLocal<>();
public static void removeCurrent() {
    current.remove();
    QueryParseContext.removeTypes();
}

分析一下executeFetchPhase返回结果。从代码中可以看出,返回的结果中包含query的结果和fetch的结果。对应的两个类分别是QuerySearchResultFetchSearchResult

QuerySearchResult

QuerySearchResult定义如下:

public class QuerySearchResult extends TransportResponse implements QuerySearchResultProvider {

    private long id;
    private SearchShardTarget shardTarget;
    private int from;
    private int size;
    private TopDocs topDocs;
    private InternalFacets facets;
    private InternalAggregations aggregations;
    private Suggest suggest;
    private boolean searchTimedOut;

    ...
}

其中最重要的应该是topDocs和aggregations。

再看看TopDocs的定义:

public class TopDocs {
  public int totalHits;
  public ScoreDoc[] scoreDocs;
  private float maxScore;
  ...
}  

里面包含了搜索命中的结果数量、文档编号、文档匹配分数、最大匹配分数。再看看ScoreDoc是如何定义的。

public class ScoreDoc {
  public float score;
  public int doc;
  public int shardIndex;
  ...
}

这里的doc就是内部的文档编号,可以通过IndexSearcher#doc(int)方法获取对应的文档内容。

因此QuerySearchResult中只包含了内部的文档编号、文档的匹配分值。

FetchSearchResult

FetchSearchResult的定义如下:

public class FetchSearchResult extends TransportResponse implements FetchSearchResultProvider {
    private long id;
    private SearchShardTarget shardTarget;
    private InternalSearchHits hits;
    private transient int counter;
    ...
}

它包含了InternalSearchHitsInternalSearchHits的定义如下:

public class InternalSearchHits implements SearchHits {
    private InternalSearchHit[] hits;
    public long totalHits;
    private float maxScore;
    ...
}

它包含了InternalSearchHitInternalSearchHit的定义如下:

public class InternalSearchHit implements SearchHit {
    ...
    private Map<String, Object> sourceAsMap;
    private byte[] sourceAsBytes;
    ...
}

它包含了文档的原始内容和解析后的内容。

因此FetchSearchResult包含了文档的具体内容。

moveToSecondPhase执行过程

在上面的executeFetchPhase执行完成之后,得到query结果和fetch结果之后,就执行moveToSecondPhase了。

moveToSecondPhaseBaseAsyncAction中是一个抽象方法,它在子类TransportSearchQueryAndFetchAction中的定义如下:

@Override
protected void moveToSecondPhase() throws Exception {
    threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
        @Override
        public void run() {
            ...
            sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
            final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
            ...
            listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
        }
    });
}

从代码中可以看出,搜索的第二阶段是在search线程池中提交一个任务,首先是对分片结果进行整体排序,然后将搜索结果进行合并。这里面分别调用了searchPhaseController.sortDocssearchPhaseController.merge两个方法。

首先看看sortDocs做了什么。它的关键代码如下:

public ScoreDoc[] sortDocs(boolean scrollSort, AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException {
    ...
    TopDocs mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
    return mergedTopDocs.scoreDocs;
}

从代码中可以看出,对文档进行排序的时候调用了TopDocs.merge方法。这个方法的关键代码如下:

public static TopDocs merge(Sort sort, int start, int size, TopDocs[] shardHits) throws IOException {
    final PriorityQueue<ShardRef> queue;
    ...
    queue = new ScoreMergeSortQueue(shardHits);
    ...

    for(int shardIDX=0;shardIDX<shardHits.length;shardIDX++) {
        ...
        queue.add(new ShardRef(shardIDX));
        ...
    }

    ...

    // hits储存最终的结果
    hits = new ScoreDoc[Math.min(size, availHitCount - start)];

    // 每次取出最小值放到结果中
    while (hitUpto < numIterOnHits) {
        ...

        // 利用优先级队列,从所有分片中取出分值最小的文档,加入到hits结果中
        ShardRef ref = queue.pop();
        final ScoreDoc hit = shardHits[ref.shardIndex].scoreDocs[ref.hitIndex++];
        hit.shardIndex = ref.shardIndex;
        ...
        hits[hitUpto - start] = hit;
        ...

        hitUpto++;

        // 数组中的数据读取完了,就从优先级队列中排除
        if (ref.hitIndex < shardHits[ref.shardIndex].scoreDocs.length) {
            queue.add(ref);
        }
    }
}

这段代码刚开始看起来有点复杂,看不太懂。其实它是归并排序的变种。因为普通的归并排序只针对两个数组。排序的时候每次从输入的两个数组中取出最小的元素,放到结果中。而这里输入的数组会有多个,每次也要取出最小的元素,放到结果中。如何快速的从多个数组中取出最小的元素呢?这里利用了优先级队列。

再看searchPhaseController.merge做了什么呢?主要是合并facet结果、aggregation结果、hits结果、suggest结果、count结果。这里就看一下最关键的hits是怎样合并的。相关的代码如下:

List<InternalSearchHit> hits = new ArrayList<>();
for (ScoreDoc shardDoc : sortedDocs) {
    FetchSearchResult fetchResult = ...;
    ...
    int index = fetchResult.counterGetAndIncrement();
    if (index < fetchResult.hits().internalHits().length) {
        ...
        InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
        ...
        hits.add(searchHit);
    }
    ...
}

hit的合并过程就是将fetchResult中所有文档的具体内容组合成一个新的列表。其他的facet、aggregations、suggest、count的合并过程也是类似。

query_and_fetch下的fromsize

举个例子,如果在搜索的时候指定search_typequery_and_fetch,再指定size为10,那么就会返回50个结果。这是为什么呢?原来,在fetch的时候就已经把fromsize参数用掉了,导致每个分片都返回了10个文档,如果有5个分片的话,那么最后合并的结果就是50个。

如果query_and_fetch情况下这两和参数的作用和query_then_fetch的行为一样。那么一次就要取出from+size个文档。如果from比较大,那么取出的文档足以撑爆内存。而如果在fetch阶段就直接使用这两个参数,每个分片最多就取出size个文档,from稍微大一些也没有关系。因此,这种行为的设计也是可以理解的。

query_then_fetch执行过程

query_then_fetch的搜索逻辑在TransportSearchQueryThenFetchAction中。它在执行的时候会启动一个异步任务。相关代码如下:

@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
    new AsyncAction(searchRequest, listener).start();
}

start方法定义在它的基类BaseAsyncTask中,主要作用是给每个需要搜索的分片单独启动一个异步任务,并让子类选择处理方式。在第一步处理完成之后,会调用子类中的moveToSecondPhase继续执行第二阶段的计算任务。在query_then_fetch中,子类的处理方式是sendExecuteQuery。相关的代码如下:

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
    searchService.sendExecuteQuery(node, request, listener);
}

sendExecuteQuery的作用是什么呢?它会将query请求发送到对应的节点上。如果对应的节点就是自己,那么就直接调用executeQueryPhase,执行query任务。

executeQueryPhase的主要作用是初始化SearchContext,然后执行queryPhase,最后清理SearchContext。返回的结果是QuerySearchResult,它只包含文档编号和必要的排序分值。相关的代码如下:

public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
    final SearchContext context = createAndPutContext(request);
    try {
       ...
       queryPhase.execute(context);
       ...
       if (context.searchType() == SearchType.COUNT) {
           freeContext(context.id());
       }
       ...
       return context.queryResult();
    } catch(...) {
        ...
    } finally {
       ...
       cleanContext(context);
    }
}

代码里面可以看到一个细节,就是只有在search_typecount的时候才会调用freeContext。也就是说,如果search_type不是count,那么SearchContext仍然会驻留在内存中。cleanContextfreeContext的区别在前面的文章里面讲过了。

queryPhase结束之后,就开始第二阶段了。第二阶段从moveToSecondPhase开始。它的代码定义在TransportSearchQueryThenFetchAction中。主要的作用是将第一阶段获取的文档编号进行排序。排序完成之后再根据文档编号获取文档里面实际的内容。相关的代码如下:

@Override
protected void moveToSecondPhase() throws Exception {
    ...
    sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
    searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
    ...
    final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
    for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
        QuerySearchResult queryResult = firstResults.get(entry.index);
        ...
        executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
    }
}

sortDocs前面已经讲过了,作用是将文档编号根据每个文档的匹配分值进行排序。

executeFetch相关的代码如下。它的作用是调用searchService开启一个异步任务,根据文档编号获取文档的具体内容,并将结果存放到fetchResults中。根据counter判断如果所有的fetch任务都执行完了,就调用finishHim来完成本次查询结果。

void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
    searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
        @Override
        public void onResult(FetchSearchResult result) {
            ...
            fetchResults.set(shardIndex, result);
            if (counter.decrementAndGet() == 0) {
                finishHim();
            }
        }
        ...
    });
}

sendExecuteFetch最后会调用executeFetchPhase。它的关键代码如下:

public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {
    final SearchContext context = findContext(request.id());
    ...
    fetchPhase.execute(context);
    ...
    freeContext(request.id());
    ...
    return context.fetchResult();
}

从代码中可以看出,在fetch阶段结束之后才会释放SearchContext

finishHim相关的代码如下。它的作用是合并每个分片的查询结果,让后将合并结果通知给listener。让它完成最后的查询结果。searchPhaseController.merge在前面讲过了,主要作用是

private void finishHim() {
    ...
    threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
        @Override
        public void run() {
            ...
            final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
            ...
            listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
            ...
        }
    });
    ...
}

搜索模式对聚合查询的影响

query_and_fetchquery_then_fetch下的聚合查询有什么区别呢?首先,前面已经讲过了,query_and_fetchexecuteFetchPhase的时候就把SearchContext释放掉了。

public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException {
    final SearchContext context = createAndPutContext(request);
    ...
    queryPhase.execute(context);
    ...
    fetchPhase.execute(context);
    ...
    freeContext(context.id());
    ...
    return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}

而在query_then_fetch中,query阶段不会释放SearchContext,在fetch阶段结束之后才会释放SearchContext。两段相关的代码如下:

public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
    final SearchContext context = createAndPutContext(request);
    ...
    queryPhase.execute(context);
    ...
    return context.queryResult();
}

public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {
    final SearchContext context = findContext(request.id());
    ...
    fetchPhase.execute(context);
    ...
    freeContext(request.id());
    ...
    return context.fetchResult();
}

在聚类搜索的时候,内存占用比较大的是FieldData数据,这些数据储存在SearchContext中。在query_then_fetch的搜索模式下,载入FieldData是在query阶段完成的。而query阶段不会释放内存,因此内存中会存放所有分片的FieldData,从而导致内存溢出。

但是我还是有个疑问,为什么不在query阶段结束之后立即释放SearchContext中的FieldData呢?这样也就不会有内存溢出的问题了。

ElasticSearch:剖析query_and_fetch和query_then_fetch的区别

标签:搜索引擎   elasticsearch   内存溢出   

原文地址:http://blog.csdn.net/caipeichao2/article/details/46418413

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!