码迷,mamicode.com
首页 > 编程语言 > 详细

springboot整合ElasticSearch

时间:2021-01-28 12:03:55      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:stop   boot   str   version   from   创建   rmq   init   each   

yml

spring:
  data:
    elasticsearch:
      client:
        reactive:
          endpoints: 192.168.209.160:9200
          connection-timeout: 10000#链接到es的超时时间,毫秒为单位,默认10秒(10000毫秒)
          socket-timeout: 10000#读取和写入的超时时间,单位为毫秒,默认5秒(5000毫秒)
  elasticsearch:
    rest:
      uris: 192.168.209.160:9200
#     这两个属性在新版本的springboot中已经不建议使用,9300属于elasticsearch各节点之间的通讯接口。
#     属于lowlevelclient。我们推荐使用9200的RestHighLevelClient去链接
#     cluster-nodes: 127.0.0.1:9300
#     cluster-name: helloElasticsearch

pom 

    <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-captcha</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>

 

Controller

package com.fwz.tproject.testfunction.controller;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.fwz.tproject.testfunction.service.ElasticSearchUtils;
import com.fwz.tproject.testfunction.service.IdGeneratorSnowflake;
import com.fwz.tproject.testfunction.service.OrderService;

/**
 * 
 * 
 * @author 冯文哲
 * @version 2018-06-11
 */
@RestController
@RequestMapping(value = "/test")
public class MainController {
  

    @Autowired
    private IdGeneratorSnowflake idGenerator;
    @Autowired
    ElasticSearchUtils utilsService;

 

    @RequestMapping(value = "createIndex")
    public String elasticsearch() {

        if (utilsService.createIndex("fwztest_index", 5, 1, "")) {

            return "创建成功";
        } else {
            return "创建失败";
        }
    }

    @RequestMapping(value = "addDoc")
    public String addDoc() {
        for (int j = 0; j < 1000; j++) {
            Map<String, Object> map = new ConcurrentHashMap<String, Object>();

            map.put("author_id", idGenerator.snowflakeId());
            map.put("title", "这有" + j + "个中国人");
            map.put("content", "其中有" + (j - 1) + "个老黑");
            map.put("create_date", new Date());
            utilsService.addDoc("fwztest_index", String.valueOf(idGenerator.snowflakeId()), map);
        }
        return "新增成功";
    }

    @RequestMapping(value = "deleteDoc")
    public String deleteDoc(String id) {
        utilsService.deleteDoc("fwztest_index", id);
        return "删除成功";
    }

    @RequestMapping(value = "updateDoc")
    public String updateDoc(String id) {
        utilsService.updateDoc("fwztest_index", id, "");
        return "修改成功";
    }

    @RequestMapping(value = "selectDoc")
    public Map<String, Object> selectDoc(String id) {
        return utilsService.getDoc("fwztest_index", id);

    }

}

Utils

package com.fwz.tproject.testfunction.service;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;

@Service
@EnableAsync
public class ElasticSearchUtils {
    @Autowired
    private RestHighLevelClient restClient;
    Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class.getName());

    /**
     * createIndex
     * 
     * @param indexName //索引名称
     * @param shards    //主分片
     * @param replicas  //备份分片
     * @param mapping   //mapping配置
     * @return
     */
    public boolean createIndex(String indexName, Integer shards, Integer replicas, String mapping) {
        logger.info(restClient.toString());
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        request.settings(Settings.builder().put("number_of_shards", 5).put("number_of_replicas", 1));

        request.mapping(
                "{\"properties\":{\"author_id\":{\"type\":\"long\"},\"title\":{\"type\":\"text\",\"analyzer\":\"standard\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"content\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"create_date\":{\"type\":\"date\"}}}",
                XContentType.JSON);
        request.setTimeout(TimeValue.timeValueMinutes(1));
        CreateIndexResponse createIndexResponse;
        try {
            createIndexResponse = restClient.indices().create(request, RequestOptions.DEFAULT);

            boolean acknowledged = createIndexResponse.isAcknowledged();

            logger.info("是否获取ACK:" + acknowledged);
            return acknowledged;
        } catch (IOException e) {
            // TODO Auto-generated catch block
            logger.error(e.toString());
        }
        return false;
    }

    /**
     * 
     * addDocument
     * 
     * @param index  索引名称
     * @param id     数据ID(为空则使用es内部ID)
     * @param source 数据(json 或 Map)
     * @return
     * @author fwzz
     * @version 创建时间:2021年1月27日 下午5:10:42
     *
     */
    @Async
    public Future<Boolean> addDoc(String index, String id, Map<String, Object> source) {

        // 增, source 里对象创建方式可以是JSON字符串,或者Map,或者XContentBuilder 对象
        IndexRequest indexRequest = new IndexRequest(index).source(source);
        if (id != null && !"".equals(id)) {

            indexRequest = indexRequest.id(id);
        }
        try {
            IndexResponse res = restClient.index(indexRequest, RequestOptions.DEFAULT);
            logger.info("新增数据成功,ID为: " + res.getId());
            return new AsyncResult<Boolean>(true);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return new AsyncResult<Boolean>(false);
    }

    /**
     * 
     * deleteDocument
     * 
     * @param index 索引名称
     * @param id    数据ID
     * @return
     * @author fwzz
     * @version 创建时间:2021年1月27日 下午5:19:26
     *
     */
    public boolean deleteDoc(String index, String id) {
        //
        DeleteRequest deleteRequest = new DeleteRequest(index, id);
        DeleteResponse res;
        try {
            res = restClient.delete(deleteRequest, RequestOptions.DEFAULT);
            logger.info(res.getResult().toString());
            logger.info("删除数据成功,ID为: " + res.getId());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 
     * updateDocument
     * 
     * @param index
     * @param id
     * @param source
     * @return
     * @author fwzz
     * @version 创建时间:2021年1月27日 下午5:25:43
     *
     */
    public boolean updateDoc(String index, String id, String source) {
        // 改, source 里对象创建方式可以是JSON字符串,或者Map,或者XContentBuilder 对象
        UpdateRequest updateRequest = new UpdateRequest(index, id).doc(source);
        try {
            UpdateResponse res = restClient.update(updateRequest, RequestOptions.DEFAULT);
            logger.info("修改数据成功,ID为: " + res.getId());
            return true;
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return false;
    }

    /**
     * selectDocument
     * 
     * @param index
     * @param id
     * @return
     * @author fwzz
     * @version 创建时间:2021年1月27日 下午5:27:33
     *
     */
    public Map<String, Object> getDoc(String index, String id) {
        //
        GetRequest getRequest = new GetRequest(index, id);
        try {
            GetResponse res = restClient.get(getRequest, RequestOptions.DEFAULT);
            logger.info("查询数据成功,ID为: " + res.getId());
            logger.info("查询数据成功,字符串数据为: " + res.getSourceAsString());
            Map<String, Object> map = res.getSourceAsMap();

            logger.info("查询数据成功,Map数据为: " + map.toString());
            return map;
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return null;
    }

    /**
     * bulkDemo
     * 
     * @param index
     * @param id
     * @return
     * @author fwzz
     * @version 创建时间:2021年1月27日 下午7:35:34
     *
     */
    public Boolean bulkRequest(String index, String id) {
        BulkRequest request = new BulkRequest();
        /**
         * map为更新或新增的数据
         */
        request.add(new IndexRequest(index).source(XContentType.JSON, new HashMap<String, Object>()));
        request.add(new DeleteRequest(index, id));
        request.add(new UpdateRequest(index, id).doc(XContentType.JSON, new HashMap<String, Object>()));

        BulkResponse bulkResponse;
        try {
            bulkResponse = restClient.bulk(request, RequestOptions.DEFAULT);
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.isFailed()) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                    logger.info(failure.getMessage());
                    continue;
                }
                DocWriteResponse itemResponse = bulkItemResponse.getResponse();
                if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                        || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                    IndexResponse indexResponse = (IndexResponse) itemResponse;
                    logger.info(indexResponse.getResult().toString());
                } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                    UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                    logger.info(updateResponse.getResult().toString());
                } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                    DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                    logger.info(deleteResponse.getResult().toString());
                }
            }
            return true;
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return false;

    }

    /**
     * searchQueryDemo 可完全取代getRequest
     * 
     * @return
     * @author fwzz
     * @version 创建时间:2021年1月27日 下午7:43:57
     *
     */
    public Boolean searchQuery() {
        /**
         * 指定index
         */
        SearchRequest searchRequest = new SearchRequest("gdp_tops*");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        /**
         * 指定query
         */
        sourceBuilder.query(QueryBuilders.termQuery("city", "北京市"));
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

        searchRequest.source(sourceBuilder);
        try {
            SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT);
            Arrays.stream(response.getHits().getHits()).forEach(i -> {
                System.out.println(i.getIndex());
                System.out.println(i.getSourceAsMap());

            });
            logger.info(response.getHits().getTotalHits().toString());
            return true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * aggsSearchDemo
     * 
     * @return
     * @author fwzz
     * @version 创建时间:2021年1月27日 下午7:46:31
     *
     */
    public Boolean aggsQuery() {
        /**
         * 指定index
         */
        SearchRequest searchRequest = new SearchRequest("gdp_tops*");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company").field("company.keyword");
        aggregation.subAggregation(AggregationBuilders.avg("average_age").field("age"));
        searchSourceBuilder.aggregation(aggregation);
        searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        /**
         * 分页查询
         */
        /*
         * searchSourceBuilder.from(0); searchSourceBuilder.size(5);
         */
        searchRequest.source(searchSourceBuilder);
        try {
            /**
             * 处理方法1 (1 2 都尝试一下)
             */
            SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT);
            Arrays.stream(response.getHits().getHits()).forEach(i -> {
                logger.info(i.getIndex());
                logger.info(i.getSourceAsMap().toString());

            });
            /**
             * 处理方法2 (1 2 都尝试一下)
             */
            Aggregations aggregations = response.getAggregations();
            Terms byCompanyAggregation = aggregations.get("by_company");
            Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
            Avg averageAge = elasticBucket.getAggregations().get("average_age");
            double avg = averageAge.getValue();

            logger.info(response.getHits().getTotalHits().toString());
            return true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * searchAsyncDemo
     * 
     * @return
     * @author fwzz
     * @version 创建时间:2021年1月27日 下午7:50:00
     *
     */
    public Boolean searchAsync() {

        /**
         * 指定index
         */
        SearchRequest searchRequest = new SearchRequest("gdp_tops*");
        restClient.searchAsync(searchRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
            @Override
            public void onResponse(SearchResponse searchResponse) {
                SearchHit[] searchHits = searchResponse.getHits().getHits();
                for (SearchHit hit : searchHits) {
                    // 结果的Index
                    String index = hit.getIndex();
                    // 结果的ID
                    String id = hit.getId();
                    // 结果的评分
                    float score = hit.getScore();
                    // 查询的结果 JSON字符串形式
                    String sourceAsString = hit.getSourceAsString();
                    // 查询的结果 Map的形式
                    Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                    // Document的title
                    String documentTitle = (String) sourceAsMap.get("title");
                    // 结果中的某个List
                    List<Object> users = (List<Object>) sourceAsMap.get("user");
                    // 结果中的某个Map
                    Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
                }
            }

            @Override
            public void onFailure(Exception e) {
                logger.error(e.toString());
            }
        });
        return true;
    }

    /**
     * 有时候需要查询的数据太多,可以考虑使用SearchRequest.scroll()方法拿到scrollId;之后再使用SearchScrollRequest
     * 其用法如下:
     * 
     * @return
     * @author fwzz
     * @version 创建时间:2021年1月27日 下午8:00:14
     *
     */
    public Boolean searchScroll() {

        SearchRequest searchRequest = new SearchRequest("posts");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        searchSourceBuilder.query(QueryBuilders.termQuery("city", "北京市"));
        searchSourceBuilder.size(5);
        searchRequest.source(searchSourceBuilder);
        searchRequest.scroll(TimeValue.timeValueMinutes(1L));
        SearchResponse searchResponse;
        try {
            searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
            String scrollId = searchResponse.getScrollId();
            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
            scrollRequest.scroll(TimeValue.timeValueSeconds(30));
            SearchResponse searchScrollResponse = restClient.scroll(scrollRequest, RequestOptions.DEFAULT);
            scrollId = searchScrollResponse.getScrollId();
            SearchHits hits = searchScrollResponse.getHits();
            logger.info(hits.getTotalHits().toString());
            return true;
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return false;
    }

}

全局ID生成工具类

package com.fwz.tproject.testfunction.service;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.net.NetUtil;
import cn.hutool.core.util.IdUtil;

@Component
public class IdGeneratorSnowflake {
    private long workerId = 0;
    private long datacenterId = 1;
    private Snowflake snowflake = IdUtil.createSnowflake(workerId, datacenterId);

    private static final Logger log = LoggerFactory.getLogger(IdGeneratorSnowflake.class.getName());

    // 依赖注入完成后执行该方法,进行一些初始化工作
    @PostConstruct
    public void init() {
        try {
            workerId = NetUtil.ipv4ToLong(NetUtil.getLocalhostStr());
            log.info("当前机器的workerId: {}", workerId);
        } catch (Exception e) {
            e.printStackTrace();
            log.warn("当前机器的workerId获取失败", e);
            // 释放ID
            workerId = NetUtil.getLocalhostStr().hashCode();
        }
    }

    // 使用默认机房号获取ID
    public synchronized long snowflakeId() {
        return snowflake.nextId();
    }

    // 自己制定机房号获取ID
    public synchronized long snowflakeId(long workerId, long datacenterId) {
        Snowflake snowflake = IdUtil.createSnowflake(workerId, datacenterId);

        return snowflake.nextId();
    }

    /**
     * 生成的是不带-的字符审,类似于: 73a64edf935d4952a287739a66f96e06
     * 
     * @return
     */
    public String simpleUUID() {
        return IdUtil.simpleUUID();
    }

    /**
     * 生成的UUID是带-的字符串,类似于: b12b6401-6f9c-4351-b2b6-d8afc9ab9272
     * 
     * @return
     */
    public String randomUUID() {
        return IdUtil.randomUUID();
    }

    public static void main(String[] args) {
        IdGeneratorSnowflake f = new IdGeneratorSnowflake();
        for (int i = 0; i < 1000; i++) {

            System.out.println(f.snowflakeId(0, 0));
        }

    }
}

 

springboot整合ElasticSearch

标签:stop   boot   str   version   from   创建   rmq   init   each   

原文地址:https://www.cnblogs.com/fengwenzhee/p/14336734.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!