标签:本质 catch ups pos pipeline type 请求协议 解析 parent
本文将详细介绍批量获取API(Multi Get API)与Bulk API。其核心需要关注MultiGetRequest 。
从上面所知,mget及批量获取文档,通过add方法添加多个Item,每一个item代表一个文件获取请求,其相关字段已在get API中详细介绍,这里就不做过多详解。
Mget API使用示例
public static void testMget() {
RestHighLevelClient client = EsClient.getClient();
try {
MultiGetRequest request = new MultiGetRequest();
request.add("twitter", "_doc", "10");
request.add("twitter", "_doc", "11");
request.add("twitter", "_doc", "12");
request.add("gisdemo", "_doc", "10");
MultiGetResponse result = client.mget(request, RequestOptions.DEFAULT);
System.out.println(result);
} catch (Throwable e) {
e.printStackTrace();
} finally {
EsClient.close(client);
}
}
返回的结果其本质是一个 GetResponse的数组,不会因为其中一个失败,整个请求失败,但其结果中会标明每一个是否成功。其返回结果类图如下:
其字段过滤(Source filtering)、路由等机制与Get API相同,故不重复讲解。
2、Bluk API详解
Bulk API可以在一次API调用中包含多个索引操作,例如更新索引,删除索引等。其API定义如下:
2.1BulkRequest详解
List<DocWriteRequest> requests:单个命令容器,DocWriteRequest的子类包括:IndexRequest、UpdateRequest、DeleteRequest。
private final Set<String> indices:requests涉及到的索引。
List<Object> payloads :有效载荷,6.4.0版本,貌似该字段意义不大,通常命令的请求体(负载数据)存放在DocWriteRequest对象中,例如IndexRequest的source字段。
通过add api为BulkRequest添加一个请求。
2.2 Bulk API请求格式详解
Bulk Rest请求协议基于如下格式:
POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
其请求格式定义如下(restfull):
2.3 bulk API通用特性分析
2.3.1 版本管理
每一个Bulk条目拥有独自的version,存在于请求条目的item的元数据中。
2.3.2 路由
每一个Bulk条目各自生效。
2.3.3 Wait For Active Shards
通常可以设置BulkRequest#waitForActiveShards来要求Bulk批量执行之前要求处于激活的最小副本数。
2.3.4 Bulk Demo
public static final void testBulk() {
RestHighLevelClient client = EsClient.getClient();
try {
IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "12")
.source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk"));
UpdateRequest updateRequest = new UpdateRequest("twitter", "_doc", "11")
.doc(new IndexRequest("twitter", "_doc", "11")
.source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk update")));
BulkRequest request = new BulkRequest();
request.add(indexRequest);
request.add(updateRequest);
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
System.out.println(failure);
continue;
}
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
System.out.println(indexRequest);
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
System.out.println(updateRequest);
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
System.out.println(deleteResponse);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
EsClient.close(client);
}
}
更多文章请关注公众号中间件兴趣圈:
Elasticsearch Multi Get、 Bulk API详解、原理与示例
标签:本质 catch ups pos pipeline type 请求协议 解析 parent
原文地址:https://blog.51cto.com/15023237/2559603