详细请前往码云查看源码:https://gitee.com/chxlay/be-helpful behelpful-search 模块 (依然存在很多 bug,算不上开源项目,仅仅适用于学习交流)
依赖:(以下正式使用的是 ES 7.6.2 版本)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <!-- ES 官方客户端 官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/index.html --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> </dependency> <!-- Gson --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency>
Yml 文件方式配置 (可以直接配置到 Nacos )
spring: # ElasticSearch 连接配置 elasticsearch: rest: uris: - behelpful-es1:9200 - behelpful-es2:9200 username: behelpful password: behelpful9527 # ES 最大连接数(Jest独有的,使用High-level客户端不需要配置以下) maxTotalConn: 10 # 一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute defaultMaxTotalConnPerRoute: 5
客户端容器注入 Jest,若使用的是 high-level 客户端则不需要配置,Spring 会自动配置注入
(推荐使用 High-level 客户端,由于 Jest 客户端过旧,码云分享项目中取消了 Jest 的整合方案,若果需要 Jest 整合方案的朋友,可以自行整合,也可以留言,我抽时间将 Jest 整合方案从 OneNote 笔记中 迁移 到 博客中)
Jest 客户端 默认使用的JSON 系列化工具类为 Gson,日期类型 系列化需要自定义,请前往此文进行查看:
开始
建立 ES 实体类 映射 索引注解(类似 MP 的 @TableName)
/** * ElasticSearch实体类注解,标明实体类存储的索引及Type */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface ISearchDocument { /** * ES 索引 纯小写命名 * * @return */ String indexName() default ""; /** * ES7以后默认_doc,即移除了type(小驼峰命名) * * @return */ String type() default "_doc"; }
描述定义 ES 索引使用的,每个 ES document数据 都需要具备这些基本属性
/** * 定义描述 ES 索引的类 */ @Data @Accessors(chain = true, fluent = true) public class IndexDefinition { private String index; private String type; private String id; }
为了规范 ES 实体类,需要创建一个 抽象类,或接口(推荐使用接口,多实现),所有的 ES 实体类都需要实现此 Bean 接口,并实重写 esId 的 方法
/** * 规范 Es实体类 */ public abstract class ISearchModel implements Serializable { @Getter @Setter private String index; /** * 获得ES主键ID * * @return */ public abstract String getEsId(); }
操作 ES 数据的通用方法的抽取封装,分页类Page我偷懒了没有自己写, 借用的是 MP 的,若果项目中没有使用 MP,那么自己编写一个替代即可
public interface ISearchService<T extends ISearchModel> { /** * 创建模板 * * @param sourceStr * @return */ Boolean putTemplate(String sourceStr); /** * 索引是否存在 * * @param indexName * @return */ Boolean existsIndex(String indexName); /** * 创建索引 * * @param sourceStr * @return */ Boolean createIndex(String sourceStr); /** * 添加数据保存到ES * * @param entity * @return */ Boolean saveEntity(T entity); /** * 异步添加数据(异步) * * @param entity * @param listener */ void saveEntityAsy(T entity, ActionListener listener); /** * 批量添加数据到ES * * @param entities * @return */ Boolean saveBatch(Collection<T> entities); /** * 指定ID查询 * * @param id * @return */ T selectById(String id); /** * 指定ID集合查询 * * @param ids * @return */ List<T> listByIds(Collection<String> ids); /** * 根据Id 修改数据(同步) * * @param entity * @return */ Boolean updateById(T entity); /** * 根据Id 修改同步数据(异步) * * @param entity * @param listener */ void updateByIdAsy(T entity, ActionListener listener); /** * 指定Id删除 * * @param id * @return */ Boolean deleteById(String id); /** * 指定Id删除(异步) * * @param id * @param listener * @return */ void deleteByIdAsy(String id, ActionListener listener); /** * 指定Id批量删除 * * @param ids * @return */ Boolean deleteByIds(Collection<String> ids); /** * 高级搜索 * * @param searchRequest * @param options * @return */ List<T> search(SearchSourceBuilder searchRequest, RequestOptions options); /** * 条件查询搜索 * * @param page * @param searchSource * @return */ Page<T> pageQuery(Page<T> page, SearchSourceBuilder searchSource); /** * 多个索引下查询,多Type下查询 * * @param page * @param searchSource * @param indices 多索引查询 * @return */ Page<T> pageQuery(Page<T> page, SearchSourceBuilder searchSource, String... indices); }
抽象实现类:
/** * 官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-supported-apis.html */ public abstract class ISearchServiceImpl<T extends ISearchModel> implements ISearchService<T> { @Autowired protected RestHighLevelClient esClient; protected transient Log log = LogFactory.getLog(this.getClass()); protected static final Gson gson = new Gson(); /** * 文档: * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-put-template.html * * @param sourceStr * @return */ @Override public Boolean putTemplate(String sourceStr) { PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest("demo*"); templateRequest.source(sourceStr, XContentType.JSON); try { AcknowledgedResponse response = esClient.indices().putTemplate(templateRequest, RequestOptions.DEFAULT); return response.isAcknowledged(); } catch (IOException e) { log.error(e.getMessage()); return false; } } @Override public Boolean existsIndex(String indexName) { if (StrUtil.isBlank(indexName)) { // 实现类泛型中的类对象 Class<T> clazz = this.entityClass(); // 从实现类泛型中去实体类的注解中的 index 值 indexName = this.parseEsAnno(clazz).index(); } GetIndexRequest getIndexRequest = new GetIndexRequest(indexName); try { boolean exists = esClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT); return exists; } catch (IOException e) { log.error(e.getMessage()); return false; } } /** * 文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-create-index.html * * @param sourceStr * @return */ @Override public Boolean createIndex(String sourceStr) { // 实现类泛型中的类对象 Class<T> clazz = this.entityClass(); IndexDefinition IndexDefinition = this.parseEsAnno(clazz); CreateIndexRequest indexRequest = new CreateIndexRequest(IndexDefinition.index()); indexRequest.source(sourceStr, XContentType.JSON); try { CreateIndexResponse indexResponse = esClient.indices().create(indexRequest, RequestOptions.DEFAULT); return indexResponse.isAcknowledged(); } catch (IOException e) { log.error(e.getMessage()); return false; } } @Override public Boolean saveEntity(T entity) { Class<? extends ISearchModel> clazz = entity.getClass(); IndexDefinition IndexDefinition = this.parseEsAnno(clazz); // 解析实体类映射的 Index 信息 IndexRequest indexRequest = new IndexRequest(IndexDefinition.index()); indexRequest.id(entity.getEsId()); String jsonStr = gson.toJson(entity); indexRequest.source(jsonStr, XContentType.JSON); try { IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT); int status = indexResponse.status().getStatus(); return status == RestStatus.CREATED.getStatus() || status == RestStatus.OK.getStatus(); } catch (IOException e) { log.error(e.getMessage()); return false; } } @Override public void saveEntityAsy(T entity, ActionListener listener) { Class<? extends ISearchModel> clazz = entity.getClass(); IndexDefinition IndexDefinition = this.parseEsAnno(clazz); // 解析实体类映射的 Index 信息 IndexRequest indexRequest = new IndexRequest(IndexDefinition.index()); indexRequest.id(entity.getEsId()); String jsonStr = gson.toJson(entity); indexRequest.source(jsonStr, XContentType.JSON); esClient.indexAsync(indexRequest, RequestOptions.DEFAULT, listener); } @Override public Boolean saveBatch(Collection<T> entities) { // 获取Service接口中泛型的实体类型 Class<T> clazz = this.entityClass(); // 解析索引信息 IndexDefinition index = this.parseEsAnno(clazz); BulkRequest bulkRequest = new BulkRequest(index.index()); // 添加批量实体类请求 for (T entity : entities) { IndexRequest request = new IndexRequest(); request.id(entity.getEsId()); request.source(gson.toJson(entity), XContentType.JSON); bulkRequest.add(request); } try { BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT); int status = bulkResponse.status().getStatus(); return status == RestStatus.OK.getStatus(); } catch (IOException e) { log.error(e.getMessage()); return false; } } @Override public T selectById(String id) { // 获取Service接口中泛型的实体类型 Class<T> entityClazz = this.entityClass(); // 解析索引信息 IndexDefinition index = this.parseEsAnno(entityClazz); GetRequest getRequest = new GetRequest(index.index()); getRequest.id(id); try { GetResponse getResponse = esClient.get(getRequest, RequestOptions.DEFAULT); T entity = null; if (getResponse.isExists()) { String sourceJsonStr = getResponse.getSourceAsString(); entity = gson.fromJson(sourceJsonStr, entityClazz); } return entity; } catch (IOException e) { log.error(e.getMessage()); return null; } } @Override public List<T> listByIds(Collection<String> ids) { // 获取Service接口中泛型的实体类型 Class<T> entityClazz = this.entityClass(); // 解析索引信息 IndexDefinition index = this.parseEsAnno(entityClazz); MultiGetRequest getRequest = new MultiGetRequest(); for (String id : ids) { MultiGetRequest.Item item = new MultiGetRequest.Item(index.index(), id); getRequest.add(item); } try { MultiGetResponse multiGetResponse = esClient.mget(getRequest, RequestOptions.DEFAULT); List<T> entities = Arrays.stream(multiGetResponse.getResponses()) .map(res -> gson.fromJson(res.getResponse().getSourceAsString(), entityClazz)) .collect(Collectors.toList()); return entities; } catch (IOException e) { log.error(e.getMessage()); return new ArrayList<>(0); } } @Override public Boolean updateById(T entity) { Class<? extends ISearchModel> clazz = entity.getClass(); IndexDefinition IndexDefinition = this.parseEsAnno(clazz); UpdateRequest updateRequest = new UpdateRequest(IndexDefinition.index(), entity.getEsId()); String jsonStr = gson.toJson(entity); updateRequest.doc(jsonStr, XContentType.JSON); try { UpdateResponse updateResponse = esClient.update(updateRequest, RequestOptions.DEFAULT); int status = updateResponse.status().getStatus(); return status == RestStatus.OK.getStatus(); } catch (IOException e) { log.error(e.getMessage()); return false; } } @Override public void updateByIdAsy(T entity, ActionListener listener) { Class<? extends ISearchModel> clazz = entity.getClass(); IndexDefinition IndexDefinition = this.parseEsAnno(clazz); UpdateRequest updateRequest = new UpdateRequest(IndexDefinition.index(), entity.getEsId()); String jsonStr = gson.toJson(entity); updateRequest.doc(jsonStr, XContentType.JSON); esClient.updateAsync(updateRequest, RequestOptions.DEFAULT, listener); } @Override public Boolean deleteById(String id) { // 获取Service接口中泛型的实体类型 Class<T> entityClazz = this.entityClass(); // 解析索引信息 IndexDefinition index = this.parseEsAnno(entityClazz); DeleteRequest deleteRequest = new DeleteRequest(index.index()); deleteRequest.id(id); try { DeleteResponse deleteResponse = esClient.delete(deleteRequest, RequestOptions.DEFAULT); int status = deleteResponse.status().getStatus(); return status == RestStatus.OK.getStatus(); } catch (IOException e) { log.error(e.getMessage()); return false; } } @Override public void deleteByIdAsy(String id, ActionListener listener) { // 获取Service接口中泛型的实体类型 Class<T> entityClazz = this.entityClass(); // 解析索引信息 IndexDefinition index = this.parseEsAnno(entityClazz); DeleteRequest deleteRequest = new DeleteRequest(index.index()); deleteRequest.id(id); esClient.deleteAsync(deleteRequest, RequestOptions.DEFAULT, listener); } @Override public Boolean deleteByIds(Collection<String> ids) { // 获取Service接口中泛型的实体类型 Class<T> entityClazz = this.entityClass(); // 解析索引信息 IndexDefinition index = this.parseEsAnno(entityClazz); BulkRequest bulkRequest = new BulkRequest(index.index()); for (String id : ids) { DeleteRequest deleteRequest = new DeleteRequest().id(id); bulkRequest.add(deleteRequest); } try { BulkResponse response = esClient.bulk(bulkRequest, RequestOptions.DEFAULT); int status = response.status().getStatus(); return status == RestStatus.OK.getStatus(); } catch (IOException e) { log.error(e.getMessage()); return false; } } @Override public List<T> search(SearchSourceBuilder searchSource, RequestOptions options) { // 获取Service接口中泛型的实体类型 Class<T> entityClazz = this.entityClass(); SearchRequest searchRequest = this.searchBuilder(); searchRequest.source(searchSource); try { SearchResponse search = esClient.search(searchRequest, options); SearchHits hits = search.getHits(); List<T> entities = Arrays.stream(hits.getHits()) .map(hit -> gson.fromJson(hit.getSourceAsString(), entityClazz)) .collect(Collectors.toList()); return entities; } catch (IOException e) { log.error(e.getMessage()); return new ArrayList<>(0); } } @Override public Page<T> pageQuery(Page<T> page, SearchSourceBuilder searchSource) { // 获取Service接口中泛型的实体类型 Class<T> entityClazz = this.entityClass(); SearchRequest searchRequest = this.searchBuilder(); searchRequest.source(searchSource); return this.page(page, searchRequest, entityClazz); } @Override public Page<T> pageQuery(Page<T> page, SearchSourceBuilder searchSource, String... indices) { // 获取Service接口中泛型的实体类型 Class<T> entityClazz = this.entityClass(); SearchRequest searchRequest = this.searchBuilder(indices); searchRequest.source(searchSource); return this.page(page, searchRequest, entityClazz); } private Page<T> page(Page<T> page, SearchRequest searchRequest, Class<T> entityClazz) { try { SearchResponse search = esClient.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = search.getHits(); TotalHits totalHits = hits.getTotalHits(); long total = totalHits.value; List<T> entities = Arrays.stream(hits.getHits()) .map(hit -> { T entity = gson.fromJson(hit.getSourceAsString(), entityClazz); entity.setIndex(hit.getIndex()); return entity; }) .collect(Collectors.toList()); page.setTotal(total).setRecords(entities).setSize(entities.size()); } catch (IOException e) { log.error(e.getMessage()); } return page; } SearchRequest searchBuilder(String... indices) { SearchRequest searchRequest = new SearchRequest(indices); return searchRequest; } SearchRequest searchBuilder() { // 获取Service接口中泛型的实体类型 Class<T> entityClazz = this.entityClass(); // 解析索引信息 IndexDefinition index = this.parseEsAnno(entityClazz); return this.searchBuilder(index.index()); } /** * 抽取距离查询的封装语句(坐标对象字段和规定的一致才能使用此方法) * * @param search * @param bool * @param lat * @param lon * @param distance */ protected void buildGeo(SearchSourceBuilder search, BoolQueryBuilder bool, Double lat, Double lon, Long distance) { // 按距离范围筛选 GeoDistanceQueryBuilder locationQuery = new GeoDistanceQueryBuilder("location"); locationQuery.point(lat, lon); locationQuery.distance(distance, DistanceUnit.METERS); bool.filter(locationQuery); // 按距离排序 GeoDistanceSortBuilder locationSort = new GeoDistanceSortBuilder("location", lat, lon); locationSort.unit(DistanceUnit.METERS); locationSort.order(SortOrder.ASC); search.sort(locationSort); } /** * 解析实体类映射ES的库表 * * @param clazz ES实体bean的类信息 * @return */ @SneakyThrows protected IndexDefinition parseEsAnno(Class<? extends ISearchModel> clazz) { boolean hasAnno = clazz.isAnnotationPresent(ISearchDocument.class); if (!hasAnno) { // ES 实体 Bean 类没有 ISearchDocument 注解,抛出异常告知调用者 throw new BaseRuntimeException("没有加入实体类注解 @ISearchDocument", "ERROR"); } ISearchDocument esAnno = clazz.getAnnotation(ISearchDocument.class); String index = esAnno.indexName(); if (Objects.equals(index, "")) { // 纯小写 index = clazz.getSimpleName().toLowerCase(); } String type = esAnno.type(); if (Objects.equals(type, "")) { // 转小驼峰 type = clazz.getSimpleName().substring(0, 1).toLowerCase() + clazz.getSimpleName().substring(1); } IndexDefinition indexModel = new IndexDefinition().index(index).type(type); return indexModel; } /** * 获取调用方法实现类中泛型的具体类对象 * * @return */ protected Class<T> entityClass() { // 当前调用方法的 Impl实现类的父类的类型 ParameterizedType superclass = (ParameterizedType) this.getClass().getGenericSuperclass(); // 当前调用方法的 Impl实现类的泛型的类型,实现类必须带泛型,否则报错 Type[] type = superclass.getActualTypeArguments(); Class clazz = (Class) type[0]; return clazz; } }
high-level 异步调用 Listener
/** * ES 异步删除数据执行监听器*/ @Slf4j @Component public class DeleteListener implements ActionListener<DeleteResponse> { @Override public void onResponse(DeleteResponse response) { log.warn("index:{},异步删除数据成功,action:{},status:{}", response.getIndex(), response.getResult().name(), response.status().name()); } @Override public void onFailure(Exception e) { log.error("异步添加数据失败,case:{},msg:{}", e.getCause(), e.getMessage()); } }
@Slf4j @Component public class IndexListener implements ActionListener<IndexResponse> { }
@Slf4j @Component public class UpdateListener implements ActionListener<UpdateResponse> { }
开始使用:
1、创建 ES 实体类:
/** * TableName(value = "test_table_name")--> Binlog 对应数据库的表 * ISearchDocument(indexName = "test_es_info") --> 对应 ES 的索引 */ @Data @TableName(value = "test_table_name") @ISearchDocument(indexName = "test_es_info") public class TestEsInfo extends ISearchModel { /** * 主键id */ private String id; /** * 名称 */ private String name; /** * 地区 */ private String address; /** * 坐标位置 */ private GeoModel location; /** * 更新时间 */ @JsonAdapter(LocalDateTimeAdapter.class) private LocalDateTime updateTime; /** * 创建时间 */ @JsonAdapter(LocalDateTimeAdapter.class) private LocalDateTime createTime; @Override public String getEsId() { return id; } }
2、创建实体类对应的模板(如不创建,ES会自动根据实体类的对象生成一个 mapping,自动生成的很多地方都不符合项目需求,所以建议自己创建一个 mapping):
{ "settings": { "number_of_shards": 3, "number_of_replicas": 2 }, "mappings": { "dynamic": true, "properties": { "id": { "type": "keyword" }, "index": { "max_result_window": 1000000 }, "name": { "type": "text", "analyzer": "ik_smart" }, "address": { "type": "keyword" }, "rank": { "type": "text", "analyzer": "ik_smart" }, "location": { "type": "geo_point" }, "updateTime": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" }, "createTime": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" } } } }
Service 类:
public interface TestSearchService extends ISearchService<TestEsInfo> { }
实现类:
@Service public class TestSearchServiceImpl extends ISearchServiceImpl<TestEsInfo> implements TestSearchService { }
Service 中可复制此函数,根据以上编排的 Json 文件创建索引:
@Override public Boolean createIndex(String indexName, String sourceStr) { CreateIndexRequest indexRequest = new CreateIndexRequest(indexName); indexRequest.source(sourceStr, XContentType.JSON); try { CreateIndexResponse indexResponse = esClient.indices().create(indexRequest, RequestOptions.DEFAULT); return indexResponse.isAcknowledged(); } catch (IOException e) { log.error(e.getMessage()); return false; } }
至此项目整合 ES 完毕,使用只需要编写响应的 实体、service、impl 进行相应的继承 即可使用,有兴趣的朋友也可以将以上部分 整合为一个 Starter 然后通过项目引入依赖即可使用,
扩展: 若果因为 ES 版本与 Spring 中相应的 ES 版本不一致 导致无法 自动注入 high-level-client 则需要自行手动注入:
配置属性
/** * 可选的配置(Spring自动会配置的) * Spring-boot-aoutconfigura 版本在自带 ES 7.x 以上版本的无需配置此处,Spring 已经自动加载配置属性*/ @Data @ConfigurationProperties("spring.elasticsearch.rest") public class ElasticsearchSearchProperties { /** * 集群中所哟地址 */ private List<String> uris = new ArrayList(Collections.singletonList("http://localhost:9200")); /** * 链接用户名 */ private String username; /** * 链接密码 */ private String password; /** * 链接超时时间 */ private int connectionTimeout = -1; /** * 读超时时长秒 */ private int readTimeout = 30; }
配置类:
/** * 可选的配置(Spring自动会配置的) * Spring-boot-aoutconfigura 版本在自带 ES 7.x 以上版本的无需配置此处,Spring 已经自动加载*/ @EnableConfigurationProperties(value = {ElasticsearchSearchProperties.class}) public class SearchConfig { @Autowired private ElasticsearchSearchProperties searchProperties; /** * 官方文档: * * @param restClientBuilder * @return * @see {https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-initialization.html} */ @Bean @ConditionalOnMissingBean public RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) { return new RestHighLevelClient(restClientBuilder); } @Bean @ConditionalOnMissingBean public RestClientBuilder restClientBuilder() { HttpHost[] hosts = searchProperties.getUris().stream().map(this::createHttpHost).toArray((x$0) -> new HttpHost[x$0] ); // 集群链接地址构建 RestClientBuilder builder = RestClient.builder(hosts); // 配置用户名和密码 final BasicCredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials( // 用户名 searchProperties.getUsername(), // 密码 searchProperties.getPassword() ) ); builder //设置超时 .setRequestConfigCallback((RequestConfig.Builder requestConfigBuilder) -> { requestConfigBuilder.setConnectTimeout(searchProperties.getConnectionTimeout()); requestConfigBuilder.setSocketTimeout(-1); requestConfigBuilder.setConnectionRequestTimeout(-1); return requestConfigBuilder; }) // 配置客户端密码 .setHttpClientConfigCallback((HttpAsyncClientBuilder httpClientBuilder) -> { httpClientBuilder .disableAuthCaching() .setDefaultCredentialsProvider(provider); return httpClientBuilder; }); return builder; } private HttpHost createHttpHost(String uri) { try { return this.createHttpHost(URI.create(uri)); } catch (IllegalArgumentException var3) { return HttpHost.create(uri); } } private HttpHost createHttpHost(URI uri) { if (!StringUtils.hasLength(uri.getUserInfo())) { return HttpHost.create(uri.toString()); } else { try { return HttpHost.create((new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())).toString()); } catch (URISyntaxException exception) { throw new IllegalStateException(exception); } } } }