项目整合 ES (干货)

详细请前往码云查看源码:https://gitee.com/chxlay/be-helpful     behelpful-search 模块  (依然存在很多 bug,算不上开源项目,仅仅适用于学习交流)

依赖:(以下正式使用的是 ES 7.6.2 版本)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<!-- ES 官方客户端 官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/index.html -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<!-- Gson -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

Yml 文件方式配置 (可以直接配置到 Nacos )

spring:
  # ElasticSearch 连接配置
  elasticsearch:
    rest:
      uris:
        - behelpful-es1:9200
        - behelpful-es2:9200
      username: behelpful
      password: behelpful9527

      # ES 最大连接数(Jest独有的,使用High-level客户端不需要配置以下)
      maxTotalConn: 10
      # 一个route 默认不超过2个连接  路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
      defaultMaxTotalConnPerRoute: 5

客户端容器注入 Jest,若使用的是 high-level 客户端则不需要配置,Spring 会自动配置注入

(推荐使用 High-level 客户端,由于 Jest 客户端过旧,码云分享项目中取消了 Jest 的整合方案,若果需要 Jest 整合方案的朋友,可以自行整合,也可以留言,我抽时间将  Jest 整合方案从 OneNote 笔记中 迁移 到 博客中)

Jest 客户端 默认使用的JSON 系列化工具类为 Gson,日期类型 系列化需要自定义,请前往此文进行查看: 


开始

建立 ES 实体类 映射 索引注解(类似 MP  的 @TableName)

/**
 * ElasticSearch实体类注解,标明实体类存储的索引及Type
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ISearchDocument {
   /**
    * ES 索引 纯小写命名
    *
    * @return
    */
   String indexName() default "";
   /**
    * ES7以后默认_doc,即移除了type(小驼峰命名)
    *
    * @return
    */
   String type() default "_doc";
}

描述定义 ES 索引使用的,每个 ES document数据 都需要具备这些基本属性

/**
 * 定义描述 ES 索引的类
 */
@Data
@Accessors(chain = true, fluent = true)
public class IndexDefinition {
    private String index;
    private String type;
    private String id;
}

为了规范 ES 实体类,需要创建一个 抽象类,或接口(推荐使用接口,多实现),所有的 ES 实体类都需要实现此 Bean 接口,并实重写 esId 的 方法

/**
 * 规范 Es实体类
 */
public abstract class ISearchModel implements Serializable {

   @Getter
   @Setter
   private String index;

   /**
    * 获得ES主键ID
    *
    * @return
    */
   public abstract String getEsId();
}

操作 ES 数据的通用方法的抽取封装,分页类Page我偷懒了没有自己写, 借用的是 MP 的,若果项目中没有使用 MP,那么自己编写一个替代即可

public interface ISearchService<T extends ISearchModel> {

   /**
    * 创建模板
    *
    * @param sourceStr
    * @return
    */
   Boolean putTemplate(String sourceStr);


   /**
    * 索引是否存在
    *
    * @param indexName
    * @return
    */
   Boolean existsIndex(String indexName);

   /**
    * 创建索引
    *
    * @param sourceStr
    * @return
    */
   Boolean createIndex(String sourceStr);

   /**
    * 添加数据保存到ES
    *
    * @param entity
    * @return
    */
   Boolean saveEntity(T entity);

   /**
    * 异步添加数据(异步)
    *
    * @param entity
    * @param listener
    */
   void saveEntityAsy(T entity, ActionListener listener);

   /**
    * 批量添加数据到ES
    *
    * @param entities
    * @return
    */
   Boolean saveBatch(Collection<T> entities);

   /**
    * 指定ID查询
    *
    * @param id
    * @return
    */
   T selectById(String id);

   /**
    * 指定ID集合查询
    *
    * @param ids
    * @return
    */
   List<T> listByIds(Collection<String> ids);

   /**
    * 根据Id 修改数据(同步)
    *
    * @param entity
    * @return
    */
   Boolean updateById(T entity);

   /**
    * 根据Id 修改同步数据(异步)
    *
    * @param entity
    * @param listener
    */
   void updateByIdAsy(T entity, ActionListener listener);

   /**
    * 指定Id删除
    *
    * @param id
    * @return
    */
   Boolean deleteById(String id);

   /**
    * 指定Id删除(异步)
    *
    * @param id
    * @param listener
    * @return
    */
   void deleteByIdAsy(String id, ActionListener listener);

   /**
    * 指定Id批量删除
    *
    * @param ids
    * @return
    */
   Boolean deleteByIds(Collection<String> ids);

   /**
    * 高级搜索
    *
    * @param searchRequest
    * @param options
    * @return
    */
   List<T> search(SearchSourceBuilder searchRequest, RequestOptions options);

   /**
    * 条件查询搜索
    *
    * @param page
    * @param searchSource
    * @return
    */
   Page<T> pageQuery(Page<T> page, SearchSourceBuilder searchSource);

   /**
    * 多个索引下查询,多Type下查询
    *
    * @param page
    * @param searchSource
    * @param indices      多索引查询
    * @return
    */
   Page<T> pageQuery(Page<T> page, SearchSourceBuilder searchSource, String... indices);
}

抽象实现类:

/**
 * 官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-supported-apis.html
 */
public abstract class ISearchServiceImpl<T extends ISearchModel> implements ISearchService<T> {

    @Autowired
    protected RestHighLevelClient esClient;
    protected transient Log log = LogFactory.getLog(this.getClass());
    protected static final Gson gson = new Gson();

    /**
     * 文档:
     * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-put-template.html
     *
     * @param sourceStr
     * @return
     */
    @Override
    public Boolean putTemplate(String sourceStr) {
        PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest("demo*");
        templateRequest.source(sourceStr, XContentType.JSON);
        try {
            AcknowledgedResponse response = esClient.indices().putTemplate(templateRequest, RequestOptions.DEFAULT);
            return response.isAcknowledged();
        } catch (IOException e) {
            log.error(e.getMessage());
            return false;
        }
    }


    @Override
    public Boolean existsIndex(String indexName) {
        if (StrUtil.isBlank(indexName)) {
            // 实现类泛型中的类对象
            Class<T> clazz = this.entityClass();
            // 从实现类泛型中去实体类的注解中的 index 值
            indexName = this.parseEsAnno(clazz).index();
        }
        GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
        try {
            boolean exists = esClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
            return exists;
        } catch (IOException e) {
            log.error(e.getMessage());
            return false;
        }
    }

    /**
     * 文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-create-index.html
     *
     * @param sourceStr
     * @return
     */
    @Override
    public Boolean createIndex(String sourceStr) {
        // 实现类泛型中的类对象
        Class<T> clazz = this.entityClass();
        IndexDefinition IndexDefinition = this.parseEsAnno(clazz);
        CreateIndexRequest indexRequest = new CreateIndexRequest(IndexDefinition.index());
        indexRequest.source(sourceStr, XContentType.JSON);
        try {
            CreateIndexResponse indexResponse = esClient.indices().create(indexRequest, RequestOptions.DEFAULT);
            return indexResponse.isAcknowledged();
        } catch (IOException e) {
            log.error(e.getMessage());
            return false;
        }
    }

    @Override
    public Boolean saveEntity(T entity) {
        Class<? extends ISearchModel> clazz = entity.getClass();
        IndexDefinition IndexDefinition = this.parseEsAnno(clazz);
        // 解析实体类映射的 Index 信息
        IndexRequest indexRequest = new IndexRequest(IndexDefinition.index());
        indexRequest.id(entity.getEsId());
        String jsonStr = gson.toJson(entity);
        indexRequest.source(jsonStr, XContentType.JSON);
        try {
            IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT);
            int status = indexResponse.status().getStatus();
            return status == RestStatus.CREATED.getStatus() || status == RestStatus.OK.getStatus();
        } catch (IOException e) {
            log.error(e.getMessage());
            return false;
        }
    }

    @Override
    public void saveEntityAsy(T entity, ActionListener listener) {
        Class<? extends ISearchModel> clazz = entity.getClass();
        IndexDefinition IndexDefinition = this.parseEsAnno(clazz);
        // 解析实体类映射的 Index 信息
        IndexRequest indexRequest = new IndexRequest(IndexDefinition.index());
        indexRequest.id(entity.getEsId());
        String jsonStr = gson.toJson(entity);
        indexRequest.source(jsonStr, XContentType.JSON);
        esClient.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);
    }

    @Override
    public Boolean saveBatch(Collection<T> entities) {
        // 获取Service接口中泛型的实体类型
        Class<T> clazz = this.entityClass();
        // 解析索引信息
        IndexDefinition index = this.parseEsAnno(clazz);
        BulkRequest bulkRequest = new BulkRequest(index.index());
        // 添加批量实体类请求
        for (T entity : entities) {
            IndexRequest request = new IndexRequest();
            request.id(entity.getEsId());
            request.source(gson.toJson(entity), XContentType.JSON);
            bulkRequest.add(request);
        }
        try {
            BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            int status = bulkResponse.status().getStatus();
            return status == RestStatus.OK.getStatus();
        } catch (IOException e) {
            log.error(e.getMessage());
            return false;
        }
    }

    @Override
    public T selectById(String id) {
        // 获取Service接口中泛型的实体类型
        Class<T> entityClazz = this.entityClass();
        // 解析索引信息
        IndexDefinition index = this.parseEsAnno(entityClazz);
        GetRequest getRequest = new GetRequest(index.index());
        getRequest.id(id);
        try {
            GetResponse getResponse = esClient.get(getRequest, RequestOptions.DEFAULT);
            T entity = null;
            if (getResponse.isExists()) {
                String sourceJsonStr = getResponse.getSourceAsString();
                entity = gson.fromJson(sourceJsonStr, entityClazz);
            }
            return entity;
        } catch (IOException e) {
            log.error(e.getMessage());
            return null;
        }
    }

    @Override
    public List<T> listByIds(Collection<String> ids) {
        // 获取Service接口中泛型的实体类型
        Class<T> entityClazz = this.entityClass();
        // 解析索引信息
        IndexDefinition index = this.parseEsAnno(entityClazz);

        MultiGetRequest getRequest = new MultiGetRequest();
        for (String id : ids) {
            MultiGetRequest.Item item = new MultiGetRequest.Item(index.index(), id);
            getRequest.add(item);
        }
        try {
            MultiGetResponse multiGetResponse = esClient.mget(getRequest, RequestOptions.DEFAULT);
            List<T> entities = Arrays.stream(multiGetResponse.getResponses())
                    .map(res -> gson.fromJson(res.getResponse().getSourceAsString(), entityClazz))
                    .collect(Collectors.toList());
            return entities;
        } catch (IOException e) {
            log.error(e.getMessage());
            return new ArrayList<>(0);
        }
    }

    @Override
    public Boolean updateById(T entity) {
        Class<? extends ISearchModel> clazz = entity.getClass();
        IndexDefinition IndexDefinition = this.parseEsAnno(clazz);
        UpdateRequest updateRequest = new UpdateRequest(IndexDefinition.index(), entity.getEsId());
        String jsonStr = gson.toJson(entity);
        updateRequest.doc(jsonStr, XContentType.JSON);
        try {
            UpdateResponse updateResponse = esClient.update(updateRequest, RequestOptions.DEFAULT);
            int status = updateResponse.status().getStatus();
            return status == RestStatus.OK.getStatus();
        } catch (IOException e) {
            log.error(e.getMessage());
            return false;
        }
    }

    @Override
    public void updateByIdAsy(T entity, ActionListener listener) {
        Class<? extends ISearchModel> clazz = entity.getClass();
        IndexDefinition IndexDefinition = this.parseEsAnno(clazz);
        UpdateRequest updateRequest = new UpdateRequest(IndexDefinition.index(), entity.getEsId());
        String jsonStr = gson.toJson(entity);
        updateRequest.doc(jsonStr, XContentType.JSON);
        esClient.updateAsync(updateRequest, RequestOptions.DEFAULT, listener);
    }

    @Override
    public Boolean deleteById(String id) {
        // 获取Service接口中泛型的实体类型
        Class<T> entityClazz = this.entityClass();
        // 解析索引信息
        IndexDefinition index = this.parseEsAnno(entityClazz);

        DeleteRequest deleteRequest = new DeleteRequest(index.index());
        deleteRequest.id(id);
        try {
            DeleteResponse deleteResponse = esClient.delete(deleteRequest, RequestOptions.DEFAULT);
            int status = deleteResponse.status().getStatus();
            return status == RestStatus.OK.getStatus();
        } catch (IOException e) {
            log.error(e.getMessage());
            return false;
        }
    }

    @Override
    public void deleteByIdAsy(String id, ActionListener listener) {
        // 获取Service接口中泛型的实体类型
        Class<T> entityClazz = this.entityClass();
        // 解析索引信息
        IndexDefinition index = this.parseEsAnno(entityClazz);

        DeleteRequest deleteRequest = new DeleteRequest(index.index());
        deleteRequest.id(id);
        esClient.deleteAsync(deleteRequest, RequestOptions.DEFAULT, listener);
    }

    @Override
    public Boolean deleteByIds(Collection<String> ids) {
        // 获取Service接口中泛型的实体类型
        Class<T> entityClazz = this.entityClass();
        // 解析索引信息
        IndexDefinition index = this.parseEsAnno(entityClazz);

        BulkRequest bulkRequest = new BulkRequest(index.index());
        for (String id : ids) {
            DeleteRequest deleteRequest = new DeleteRequest().id(id);
            bulkRequest.add(deleteRequest);
        }
        try {
            BulkResponse response = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            int status = response.status().getStatus();
            return status == RestStatus.OK.getStatus();
        } catch (IOException e) {
            log.error(e.getMessage());
            return false;
        }
    }


    @Override
    public List<T> search(SearchSourceBuilder searchSource, RequestOptions options) {
        // 获取Service接口中泛型的实体类型
        Class<T> entityClazz = this.entityClass();
        SearchRequest searchRequest = this.searchBuilder();
        searchRequest.source(searchSource);
        try {
            SearchResponse search = esClient.search(searchRequest, options);
            SearchHits hits = search.getHits();
            List<T> entities = Arrays.stream(hits.getHits())
                    .map(hit -> gson.fromJson(hit.getSourceAsString(), entityClazz))
                    .collect(Collectors.toList());
            return entities;
        } catch (IOException e) {
            log.error(e.getMessage());
            return new ArrayList<>(0);
        }
    }

    @Override
    public Page<T> pageQuery(Page<T> page, SearchSourceBuilder searchSource) {
        // 获取Service接口中泛型的实体类型
        Class<T> entityClazz = this.entityClass();
        SearchRequest searchRequest = this.searchBuilder();
        searchRequest.source(searchSource);
        return this.page(page, searchRequest, entityClazz);
    }

    @Override
    public Page<T> pageQuery(Page<T> page, SearchSourceBuilder searchSource, String... indices) {
        // 获取Service接口中泛型的实体类型
        Class<T> entityClazz = this.entityClass();
        SearchRequest searchRequest = this.searchBuilder(indices);
        searchRequest.source(searchSource);
        return this.page(page, searchRequest, entityClazz);
    }

    private Page<T> page(Page<T> page, SearchRequest searchRequest, Class<T> entityClazz) {
        try {
            SearchResponse search = esClient.search(searchRequest, RequestOptions.DEFAULT);
            SearchHits hits = search.getHits();
            TotalHits totalHits = hits.getTotalHits();
            long total = totalHits.value;
            List<T> entities = Arrays.stream(hits.getHits())
                    .map(hit -> {
                        T entity = gson.fromJson(hit.getSourceAsString(), entityClazz);
                        entity.setIndex(hit.getIndex());
                        return entity;
                    })
                    .collect(Collectors.toList());
            page.setTotal(total).setRecords(entities).setSize(entities.size());
        } catch (IOException e) {
            log.error(e.getMessage());
        }
        return page;
    }

    SearchRequest searchBuilder(String... indices) {
        SearchRequest searchRequest = new SearchRequest(indices);
        return searchRequest;
    }


    SearchRequest searchBuilder() {
        // 获取Service接口中泛型的实体类型
        Class<T> entityClazz = this.entityClass();
        // 解析索引信息
        IndexDefinition index = this.parseEsAnno(entityClazz);
        return this.searchBuilder(index.index());
    }

    /**
     * 抽取距离查询的封装语句(坐标对象字段和规定的一致才能使用此方法)
     *
     * @param search
     * @param bool
     * @param lat
     * @param lon
     * @param distance
     */
    protected void buildGeo(SearchSourceBuilder search, BoolQueryBuilder bool, Double lat, Double lon, Long distance) {
        // 按距离范围筛选
        GeoDistanceQueryBuilder locationQuery = new GeoDistanceQueryBuilder("location");
        locationQuery.point(lat, lon);
        locationQuery.distance(distance, DistanceUnit.METERS);
        bool.filter(locationQuery);

        // 按距离排序
        GeoDistanceSortBuilder locationSort = new GeoDistanceSortBuilder("location", lat, lon);
        locationSort.unit(DistanceUnit.METERS);
        locationSort.order(SortOrder.ASC);
        search.sort(locationSort);
    }

    /**
     * 解析实体类映射ES的库表
     *
     * @param clazz ES实体bean的类信息
     * @return
     */
    @SneakyThrows
    protected IndexDefinition parseEsAnno(Class<? extends ISearchModel> clazz) {
        boolean hasAnno = clazz.isAnnotationPresent(ISearchDocument.class);
        if (!hasAnno) {
            // ES 实体 Bean 类没有 ISearchDocument 注解,抛出异常告知调用者
            throw new BaseRuntimeException("没有加入实体类注解 @ISearchDocument", "ERROR");
        }
        ISearchDocument esAnno = clazz.getAnnotation(ISearchDocument.class);
        String index = esAnno.indexName();
        if (Objects.equals(index, "")) {
            // 纯小写
            index = clazz.getSimpleName().toLowerCase();
        }
        String type = esAnno.type();
        if (Objects.equals(type, "")) {
            // 转小驼峰
            type = clazz.getSimpleName().substring(0, 1).toLowerCase() + clazz.getSimpleName().substring(1);
        }
        IndexDefinition indexModel = new IndexDefinition().index(index).type(type);
        return indexModel;
    }

    /**
     * 获取调用方法实现类中泛型的具体类对象
     *
     * @return
     */
    protected Class<T> entityClass() {
        // 当前调用方法的 Impl实现类的父类的类型
        ParameterizedType superclass = (ParameterizedType) this.getClass().getGenericSuperclass();
        // 当前调用方法的 Impl实现类的泛型的类型,实现类必须带泛型,否则报错
        Type[] type = superclass.getActualTypeArguments();
        Class clazz = (Class) type[0];
        return clazz;
    }
}

high-level 异步调用 Listener

/**
 * ES 异步删除数据执行监听器*/
@Slf4j
@Component
public class DeleteListener implements ActionListener<DeleteResponse> {


   @Override
   public void onResponse(DeleteResponse response) {
      log.warn("index:{},异步删除数据成功,action:{},status:{}", response.getIndex(), response.getResult().name(), response.status().name());
   }

   @Override
   public void onFailure(Exception e) {
      log.error("异步添加数据失败,case:{},msg:{}", e.getCause(), e.getMessage());
   }
}
@Slf4j
@Component
public class IndexListener implements ActionListener<IndexResponse> {
}
@Slf4j
@Component
public class UpdateListener implements ActionListener<UpdateResponse> {
}

开始使用:

1、创建 ES 实体类:

/**
 * TableName(value = "test_table_name")--> Binlog 对应数据库的表
 * ISearchDocument(indexName = "test_es_info") --> 对应 ES 的索引
 */
@Data
@TableName(value = "test_table_name")
@ISearchDocument(indexName = "test_es_info")
public class TestEsInfo extends ISearchModel {
    /**
     * 主键id
     */
    private String id;
    /**
     * 名称
     */
    private String name;

    /**
     * 地区
     */
    private String address;
    /**
     * 坐标位置
     */
    private GeoModel location;

    /**
     * 更新时间
     */
    @JsonAdapter(LocalDateTimeAdapter.class)
    private LocalDateTime updateTime;
    /**
     * 创建时间
     */
    @JsonAdapter(LocalDateTimeAdapter.class)
    private LocalDateTime createTime;

    @Override
    public String getEsId() {
        return id;
    }
}

2、创建实体类对应的模板(如不创建,ES会自动根据实体类的对象生成一个 mapping,自动生成的很多地方都不符合项目需求,所以建议自己创建一个 mapping):

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-put-template.html

{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 2
  },
  "mappings": {
    "dynamic": true,
    "properties": {
      "id": {
        "type": "keyword"
      },
      "index": {
        "max_result_window": 1000000
      },
      "name": {
        "type": "text",
        "analyzer": "ik_smart"
      },

      "address": {
        "type": "keyword"
      },
      "rank": {
        "type": "text",
        "analyzer": "ik_smart"
      },

      "location": {
        "type": "geo_point"
      },
      "updateTime": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "createTime": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss"
      }
    }
  }
}

 Service 类:

public interface TestSearchService extends ISearchService<TestEsInfo> {
}

实现类:

@Service
public class TestSearchServiceImpl extends ISearchServiceImpl<TestEsInfo> implements TestSearchService {
}

Service 中可复制此函数,根据以上编排的 Json 文件创建索引:

@Override
public Boolean createIndex(String indexName, String sourceStr) {
   CreateIndexRequest indexRequest = new CreateIndexRequest(indexName);
   indexRequest.source(sourceStr, XContentType.JSON);
   try {
      CreateIndexResponse indexResponse = esClient.indices().create(indexRequest, RequestOptions.DEFAULT);
      return indexResponse.isAcknowledged();
   } catch (IOException e) {
      log.error(e.getMessage());
      return false;
   }
}

 

至此项目整合 ES 完毕,使用只需要编写响应的  实体、service、impl  进行相应的继承 即可使用,有兴趣的朋友也可以将以上部分 整合为一个 Starter  然后通过项目引入依赖即可使用,

 


扩展: 若果因为 ES 版本与 Spring 中相应的 ES 版本不一致 导致无法 自动注入 high-level-client  则需要自行手动注入:

配置属性

/**
 * 可选的配置(Spring自动会配置的)
 * Spring-boot-aoutconfigura 版本在自带 ES 7.x 以上版本的无需配置此处,Spring 已经自动加载配置属性*/
@Data
@ConfigurationProperties("spring.elasticsearch.rest")
public class ElasticsearchSearchProperties {

    /**
     * 集群中所哟地址
     */
    private List<String> uris = new ArrayList(Collections.singletonList("http://localhost:9200"));
    /**
     * 链接用户名
     */
    private String username;
    /**
     * 链接密码
     */
    private String password;
    /**
     * 链接超时时间
     */
    private int connectionTimeout = -1;
    /**
     * 读超时时长秒
     */
    private int readTimeout = 30;
}

配置类:

/**
 * 可选的配置(Spring自动会配置的)
 * Spring-boot-aoutconfigura 版本在自带 ES 7.x 以上版本的无需配置此处,Spring 已经自动加载*/
@EnableConfigurationProperties(value = {ElasticsearchSearchProperties.class})
public class SearchConfig {

    @Autowired
    private ElasticsearchSearchProperties searchProperties;

    /**
     * 官方文档:
     *
     * @param restClientBuilder
     * @return
     * @see {https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-initialization.html}
     */
    @Bean
    @ConditionalOnMissingBean
    public RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) {
        return new RestHighLevelClient(restClientBuilder);
    }

    @Bean
    @ConditionalOnMissingBean
    public RestClientBuilder restClientBuilder() {
        HttpHost[] hosts = searchProperties.getUris().stream().map(this::createHttpHost).toArray((x$0) ->
                new HttpHost[x$0]
        );

        // 集群链接地址构建
        RestClientBuilder builder = RestClient.builder(hosts);

        // 配置用户名和密码
        final BasicCredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(
                AuthScope.ANY,
                new UsernamePasswordCredentials(
                        // 用户名
                        searchProperties.getUsername(),
                        // 密码
                        searchProperties.getPassword()
                )
        );

        builder
                //设置超时
                .setRequestConfigCallback((RequestConfig.Builder requestConfigBuilder) -> {
                    requestConfigBuilder.setConnectTimeout(searchProperties.getConnectionTimeout());
                    requestConfigBuilder.setSocketTimeout(-1);
                    requestConfigBuilder.setConnectionRequestTimeout(-1);
                    return requestConfigBuilder;
                })
                // 配置客户端密码
                .setHttpClientConfigCallback((HttpAsyncClientBuilder httpClientBuilder) -> {
                    httpClientBuilder
                            .disableAuthCaching()
                            .setDefaultCredentialsProvider(provider);
                    return httpClientBuilder;
                });

        return builder;
    }

    private HttpHost createHttpHost(String uri) {
        try {
            return this.createHttpHost(URI.create(uri));
        } catch (IllegalArgumentException var3) {
            return HttpHost.create(uri);
        }
    }


    private HttpHost createHttpHost(URI uri) {
        if (!StringUtils.hasLength(uri.getUserInfo())) {
            return HttpHost.create(uri.toString());
        } else {
            try {
                return HttpHost.create((new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())).toString());
            } catch (URISyntaxException exception) {
                throw new IllegalStateException(exception);
            }
        }
    }

}

 

上一篇:kafka在线修改topic配置


下一篇:【c#】JavaScriptSerializer 不序列化null值