Java连接es客户端方式
-
Elasticsearch Java API有四类client连接方式:TransportClient、RestClient、Jest和Spring Data Elasticsearch。
-
其中TransportClient和RestClient是es原生的Api。TransportClient可以支持2.x,5.x版本,但将被7.0弃用并在8.0中删除,因此不推荐使用,取而代之的是Java High Level REST Client,它使用HTTP请求而不是Java序列化请求。
-
其中Jest是Java社区开发的,是es的Java Http Rest客户端,更新有一定延迟,目前最新版本对接es 6.3.1,也不推荐使用。
-
Spring Data Elasticsearch则是Spring集成的es开发包,与Spring生态对接。
-
推荐使用RestClient,而elastic官方提供了高低阶两个JAVA REST Client版本:
- low level rest client:低级别的rest客户端,通过http与集群交互,用户需要自己编组请求json串、解析响应的json串。
- high level rest client:高级别客户端,基于低级别的客户端,增加了编组请求json串、解析响应的json串等API。
-
所有API支持同步、异步两种方式,同步方法只返回一个结果对象;异步方法以Async为后缀,通过listener参数来通知结果。
-
依赖:
<!--elasticsearch全文搜索引擎--> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.2.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.2.0</version> </dependency>
部分API
-
GetResponse:返回所查询的文档。
//这两个属性是查询请求提供的,肯定有 String index = getResponse.getIndex(); String id = getResponse.getId(); //判断是否有文档 if (getResponse.isExists()) { long version = getResponse.getVersion(); //将文档返回为String形式 String sourceAsString = getResponse.getSourceAsString(); //返回为Map Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); //返回为字节数组 byte[] sourceAsBytes = getResponse.getSourceAsBytes(); } else { }
-
可选属性:
//不返回_source字段,默认返回 request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); //设置允许返回和过滤的属性 String[] includes = new String[]{"message", "*Date"}; String[] excludes = Strings.EMPTY_ARRAY; //true:返回_source字段 FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext); //单独保存某个属性 request.storedFields("message"); GetResponse getResponse = client.get(request, RequestOptions.DEFAULT); String message = getResponse.getField("message").getValue();
-
request.opType(DocWriteRequest.OpType || str): 指定请求类型,DocWriteRequest.OpType类型或String。
IndexRequest request=new IndexRequest(); request.opType(DocWriteRequest.OpType.CREATE); request.opType("create"); //create或update(默认)
示例
- client连接获取工具
-
(1)不做任何自定义设置,直接获取。
public class ESClientConnectionUtil { public static RestHighLevelClient client; /** * @Author haien * @Description 获取client * @Date 2019/7/18 * @Param [] * @return org.elasticsearch.client.RestHighLevelClient **/ public static RestHighLevelClient getESClientConnection(){ if(client==null){ //创建client,可给集群new多个节点地址,使负载均衡 client=new RestHighLevelClient(RestClient.builder( new HttpHost("127.0.0.1",9200,"http") //,new HttpHost("192.168.200.100",9200,"http"))); } return client; } /** * @Author haien * @Description 关闭client * @Date 2019/7/18 * @Param [] * @return void **/ public static void close() throws IOException { client.close(); } }
-
(2)自定义连接属性。
/** * @Author haien * @Description elastic 连接 配置类 * @Date 2019/7/23 **/ @Configuration public class RestClientConfig { //elastic连接地址 @Value("${elasticsearch.ipAddrs}") private String[] ipAddrs; //连接目标url最大超时 @Value("${elasticsearch.client.connectTimeOut}") private Integer connectTimeOut; //等待响应(读数据最大超时) @Value("${elasticsearch.client.socketTimeOut}") private Integer socketTimeOut; //从连接池中获取可用连接最大超时时间 @Value("${elasticsearch.client.connectionRequestTime}") private Integer connectionRequestTime; //连接池中的最大连接数 @Value("${elasticsearch.client.maxConnectNum}") private Integer maxConnectNum; //连接同一个route最大的并发数 @Value("${elasticsearch.client.maxConnectionPerRoute}") private Integer maxConnectionPerRoute; @Bean @Scope("singleton") public RestClient restClient(){ return getFactory().getClient(); } //注册连接的bean @Bean("client") @Scope("singleton") public RestHighLevelClient restHighClient(){ //调用下面getFactory(),获取工厂后获取连接 return getFactory().getRestHighClient(); } //获取client工厂,此时先执行ElasticRestClientFactory的名为init的方法 @Bean(initMethod = "init",destroyMethod = "close") public ElasticRestClientFactory getFactory(){ return ElasticRestClientFactory.build(httpHost(),connectTimeOut,socketTimeOut, connectionRequestTime,maxConnectNum,maxConnectionPerRoute); } /** * @Author haien * @Description 将elastic连接地址封装为HttpHost类 * @Date 2019/7/23 * @Param [] * @return org.apache.http.HttpHost[] **/ @Bean public HttpHost[] httpHost(){ HttpHost[] httpHosts=new HttpHost[ipAddrs.length]; for (int i=0;i<ipAddrs.length;i++){ String[] ipAddr=ipAddrs[i].split(":"); //ip、端口号、协议 httpHosts[i]=new HttpHost(ipAddr[0], Integer.valueOf(ipAddr[1]),"http"); } return httpHosts; } } /** * @Author haien * @Description client连接工厂类 * @Date 2019/7/23 **/ public class ElasticRestClientFactory { private static final Logger logger= LoggerFactory.getLogger(ElasticRestClientFactory.class); //属性预先给默认值,等下再覆盖 //连接目标url最大超时,并设置默认值 public static int CONNECT_TIMEOUT_MILLIS=3000; //等待响应(读数据)最大超时 public static int SOCKET_TIMEOUT_MILLIS=6000; //从连接池中获取可用连接最大超时 public static int CONNECTION_REQUEST_TIMEOUT_MILLIS=2000; //连接池中的最大连接数 public static int MAX_CONN_TOTAL=15; //连接同一个route最大的并发数 public static int MAX_CONN_PER_ROUTE=10; private static HttpHost[] HTTP_HOST; private RestClientBuilder builder; private RestClient restClient; private RestHighLevelClient restHighLevelClient; private static ElasticRestClientFactory restClientFactory= new ElasticRestClientFactory(); /** * @Author haien * @Description 初始化方法:设置client的属性 * @Date 2019/7/23 * @Param [] * @return void **/ public void init(){ builder=RestClient.builder(HTTP_HOST); setConnectTimeOutConfig(); setMutiConnectConfig(); restClient=builder.build(); restHighLevelClient=new RestHighLevelClient(builder); logger.info("ElasticSearch highLevelRestClient init successful"); } /** * @Author haien * @Description 配置连接超时 * @Date 2019/7/23 * @Param [] * @return void **/ public void setConnectTimeOutConfig(){ builder.setRequestConfigCallback(requestConfigBuilder -> { requestConfigBuilder.setConnectionRequestTimeout(CONNECT_TIMEOUT_MILLIS); requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS); requestConfigBuilder .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS); return requestConfigBuilder; }); } /** * @Author haien * @Description 使用异步HTTPClient时设置并发连接数 * @Date 2019/7/23 * @Param [] * @return void **/ public void setMutiConnectConfig(){ builder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL); httpClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE); return httpClientBuilder; }); } public RestClient getClient(){ return restClient; } public RestHighLevelClient getRestHighClient(){ return restHighLevelClient; } /** * @Author haien * @Description 不设置各种超时 * @Date 2019/7/23 * @Param [httpHost, maxConnectNum, maxConnectPerRoute] * @return com.haien.test.elastic.ElasticRestClientFactory **/ public static ElasticRestClientFactory build(HttpHost[] httpHost, Integer maxConnectNum,Integer maxConnectPerRoute){ HTTP_HOST=httpHost; MAX_CONN_TOTAL=maxConnectNum; MAX_CONN_PER_ROUTE=maxConnectPerRoute; return restClientFactory; } /** * @Author haien * @Description 设置各种超时 * @Date 2019/7/23 * @Param [httpHost, connectTimeOut, socketTimeOut, connectRequestTime, maxConnectNum, maxConnectPerRoute] * @return com.haien.test.elastic.ElasticRestClientFactory **/ public static ElasticRestClientFactory build(HttpHost[] httpHost, Integer connectTimeOut, Integer socketTimeOut,Integer connectRequestTime, Integer maxConnectNum,Integer maxConnectPerRoute){ HTTP_HOST=httpHost; CONNECT_TIMEOUT_MILLIS=connectTimeOut; SOCKET_TIMEOUT_MILLIS=socketTimeOut; CONNECTION_REQUEST_TIMEOUT_MILLIS=connectRequestTime; MAX_CONN_TOTAL=maxConnectNum; MAX_CONN_PER_ROUTE=maxConnectPerRoute; return restClientFactory; } /** * @Author haien * @Description 销毁方法:关闭连接 * @Date 2019/7/23 * @Param [] * @return void **/ public void close(){ if(restClient!=null){ try { restClient.close(); } catch (IOException e) { e.printStackTrace(); } } logger.info("ElasticSearch highLevelRestClient closed"); } }
-
获取连接:
@Component public class QueryTest { @Resource private RestHighLevelClient client; }
-
注意:client是通过配置类RestClientConfig注册在spring容器中的,而RestClientConfig中使用了@Value,所注入的属性都在client这个bean中,如果不通过@Resource注入client而是new或其他任何方式的话,则
@Value不会被用上,属性也不会注入。而且client所在类QueryTest是用来实现各种检索方法的,但不能在本类用main测试,因为用main的话QueryTest就只能由new创建,这样创建出来的QueryTest对象,其client属性为null。
-
对索引、文档的crud方法
public class Test { private RestHighLevelClient client=ESClientConnectionUtil.getESClientConnection(); /** * @Author haien * @Description 创建index * @Date 2019/7/18 * @Param [] * @return void **/ public boolean createIndex() throws IOException { //索引名称 CreateIndexRequest request=new CreateIndexRequest("books") //设置分片、副本和刷新间隔 .settings(Settings.builder() .put("index.number_of_shards",3) .put("index.number_of_replicas",2) .put("refresh_interval","10s") //默认分词器 //.put("analysis.analyzer.default.tokenizer","ik_smart") ); //创建fullText属性 Map<String,Object> fullText=new HashMap<>(); fullText.put("type","text"); fullText.put("analyzer","ik_max_word"); fullText.put("search_analyzer","ik_smart"); fullText.put("term_vector","with_positions_offsets"); //创建fondCode属性 Map<String,Object> fondCode=new HashMap<>(); fondCode.put("type","keyword"); Map<String,Object> properties=new HashMap<>(); properties.put("fullText",fullText); properties.put("fondCode",fondCode); Map<String,Object> mapping=new HashMap<>(); mapping.put("properties",properties); request.mapping(mapping); /*Map<String, Object> message = new HashMap<>(); message.put("type", "text"); Map<String, Object> properties = new HashMap<>(); properties.put("message", message); Map<String, Object> mapping = new HashMap<>(); mapping.put("properties", properties); request.mapping(mapping);*/ /* request.mapping( "{\n" + " \"properties\": {\n" + " \"message\": {\n" + " \"type\": \"text\"\n" + " }\n" + " }\n" + "}", XContentType.JSON); */ CreateIndexResponse response= client.indices().create(request,RequestOptions.DEFAULT); boolean acknowledged=response.isAcknowledged(); return acknowledged; } /** * @Author haien * @Description 删除索引 * @Date 2019/7/21 * @Param [indexName] * @return boolean **/ public boolean deleteIndex(String indexName) throws IOException { //判断索引是否存在 if(!existIndex(indexName)) return false; DeleteIndexRequest request=new DeleteIndexRequest(indexName); AcknowledgedResponse response= client.indices().delete(request, RequestOptions.DEFAULT); //如果事先不判存,不存在时应该是返回false return response.isAcknowledged(); } /** * @Author haien * @Description 判断索引是否存在 * @Date 2019/7/21 * @Param [indexName] * @return boolean **/ public boolean existIndex(String indexName) throws IOException { Assert.notNull(indexName,"索引名称不能为空"); GetIndexRequest request=new GetIndexRequest(indexName); return client.indices().exists(request, RequestOptions.DEFAULT); } /** * @Author haien * @Description 新增文档 * @Date 2019/7/21 * @Param [] * @return void **/ public boolean addDocument(String indexName,String id,Map<String,Object> docMap) throws IOException { /* * 四种文档设置格式:json字符串、Map或XContentBuilder对象 * */ /* IndexRequest request=new IndexRequest("posts"); request.id("1"); String jsonString="{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; //把字符串转为json,返回的还是IndexRequest request.source(jsonString,XContentType.JSON); */ /*或者*/ /* IndexRequest request=new IndexRequest("posts").id("1") //自动转为json .source("user","kimchy", "postDate",new Date(), "message","trying out Elasticsearch");*/ /*或者*/ /* Map<String,Object> jsonMap=new HashMap<>(); jsonMap.put("user","kimchy"); jsonMap.put("postDate",new Date()); jsonMap.put("message","trying out Elasticsearch"); //map会自动被转为json IndexRequest request=new IndexRequest("posts").id("1").source(jsonMap); */ /*或者*/ //用来生成json的工具 /* XContentBuilder builder=XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user","kimchy"); builder.field("postDate",new Date()); builder.field("message","trying out Elasticsearch"); } builder.endObject(); IndexRequest request=new IndexRequest("posts").id("1").source(builder);*/ boolean opFlag=Boolean.TRUE; try { Assert.notNull(indexName, "索引名称不能为空"); Assert.notNull(id, "索引文档id不能为空"); Assert.notNull(docMap, "索引文档docMap不能为空"); //未指定type,默认为_doc IndexRequest request = new IndexRequest(indexName).id(id).source(docMap); request.opType(DocWriteRequest.OpType.CREATE); client.index(request, RequestOptions.DEFAULT); } catch (Exception e){ logger.error("添加索引文档失败异常,索引名称【{}】,索引文档【{}】", indexName,id,e); opFlag=Boolean.FALSE; } return opFlag; } /** * @Author haien * @Description 查询文档 * @Date 2019/7/21 * @Param [indexName, type, id] * @return java.util.Map<java.lang.String,java.lang.Object> **/ public Map<String,Object> getDocument(String indexName,String id){ if(!existDocument(indexName,id)){ logger.error("索引或id不存在,索引名称【{}】,索引文档【{}】", indexName,id); return null; } Map<String,Object> docMap=null; try { Assert.notNull(indexName,"索引名称不能为空"); Assert.notNull(id,"索引文档id不能为空"); GetRequest request = new GetRequest(indexName,id); GetResponse response=client.get(request,RequestOptions.DEFAULT); docMap=response.getSourceAsMap(); } catch (Exception e){ logger.error("根据id获取索引文档异常,索引名称【{}】,索引文档【{}】", indexName,id,e); } return docMap; } /** * @Author haien * @Description 判断文档是否存在 * @Date 2019/7/21 * @Param [indexName, id] * @return boolean **/ public boolean existDocument(String indexName,String id){ boolean opFlag=Boolean.TRUE; try { Assert.notNull(indexName,"索引名称不能为空"); Assert.notNull(id,"索引文档id不能为空"); GetRequest request=new GetRequest(indexName,id); request.fetchSourceContext(new FetchSourceContext(false)); request.storedFields("_none_"); opFlag=client.exists(request,RequestOptions.DEFAULT); } catch (IOException e) { logger.error("根据id判断索引文档是否存在异常, 索引名称【{}】,索引文档id【{}】," + indexName,id,e); } return opFlag; } /** * @Author haien * @Description 删除文档 * @Date 2019/7/21 * @Param [indexName, type, id] * @return boolean **/ public boolean deleteDocument(String indexName,String id){ boolean opFlag=Boolean.TRUE; try{ Assert.notNull(indexName,"索引名称不能为空"); Assert.notNull(id,"索引文档id不能为空"); GetRequest request=new GetRequest(indexName,id); request.fetchSourceContext(new FetchSourceContext(false)); request.storedFields("_none_"); opFlag=client.exists(request,RequestOptions.DEFAULT); if(opFlag){ DeleteRequest request1=new DeleteRequest(indexName,id); client.delete(request1,RequestOptions.DEFAULT); } } catch (IOException e) { logger.error("根据id判断索引文档是否存在异常,索引名称【{}】,索引文档id【{}】", indexName,id); } return opFlag; } /** * @Author haien * @Description 更新文档 * @Date 2019/7/21 * @Param [indexName, type, id, docMap] * @return boolean **/ public boolean updateDocument(String indexName,String id, Map<String,Object> docMap){ boolean opFlag=Boolean.TRUE; try { Assert.notNull(indexName,"索引名称不能为空"); Assert.notNull(id,"文档id不能为空"); Assert.notNull(docMap,"索引文档docMap不能为空"); UpdateRequest request=new UpdateRequest(indexName,id).doc(docMap); client.update(request,RequestOptions.DEFAULT); } catch (IOException e) { logger.error("修改文档异常,索引名称【{}】,文档id【{}】",indexName,id,e); } return opFlag; } /** * @Author haien * @Description 批量添加文档 * @Date 2019/7/21 * @Param [indexName, docMaps] * @return boolean **/ public boolean bulkAddDocument(String indexName, List<Map<String,Object>> docMaps){ boolean opFlag=Boolean.TRUE; try { Assert.notNull(indexName,"索引名称不能为空"); Assert.notNull(docMaps,"索引文档docMaps不能为空"); BulkRequest request=new BulkRequest(); for (int i=0;i<docMaps.size();i++){ Map<String,Object> docMap=docMaps.get(i); request.add(new IndexRequest(indexName).id(docMap.get("id")+"") .source(docMap).opType(DocWriteRequest.OpType.CREATE)); } client.bulk(request,RequestOptions.DEFAULT); } catch (IOException e) { logger.error("批量添加文档异常,索引名称【{}】",indexName,e); } //处理响应 if(responses!=null){ for(BulkItemResponse bulkItemResponse:responses){ DocWriteResponse itemResponse=bulkItemResponse.getResponse(); //操作类型为新增文档 if(bulkItemResponse.getOpType()==DocWriteRequest.OpType.INDEX ||bulkItemResponse.getOpType()==DocWriteRequest.OpType.CREATE){ IndexResponse indexResponse=(IndexResponse)itemResponse; //新增文档成功的处理 }else if(bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { DeleteResponse deleteResponse=(DeleteResponse)itemResponse; //删除文档成功的处理 } } } return opFlag; } /** * @Author haien * @Description 批量修改文档 * @Date 2019/7/21 * @Param [indexName, docMaps] * @return boolean **/ public boolean bulkUpdateDocument(String indexName, List<Map<String,Object>> docMaps){ boolean opFlag=Boolean.TRUE; try { Assert.notNull(indexName,"索引名称不能为空"); Assert.notNull(docMaps,"索引文档docMaps不能为空"); BulkRequest request=new BulkRequest(); for (int i=0;i<docMaps.size();i++){ Map<String,Object> docMap=docMaps.get(i); request.add( new UpdateRequest(indexName,docMap.get("id")+"").doc(docMap)); } client.bulk(request,RequestOptions.DEFAULT); } catch (IOException e) { logger.error("批量修改文档异常,索引名称【{}】",indexName,e); } return opFlag; } /** * @Author haien * @Description 批量删除文档 * @Date 2019/7/25 * @Param [indexName, docMaps] * @return boolean **/ public boolean bulkdeleteDocument(String indexName, List<Map<String,Object>> docMaps){ boolean opFlag=Boolean.TRUE; try { Assert.notNull(indexName,"索引名称不能为空"); Assert.notNull(docMaps,"索引文档docMaps不能为空"); BulkRequest request=new BulkRequest(); for (int i=0;i<docMaps.size();i++){ Map<String,Object> docMap=docMaps.get(i); request.add(new DeleteRequest(indexName,docMap.get("id")+"")); } client.bulk(request,RequestOptions.DEFAULT); } catch (IOException e) { logger.error("批量删除文档异常,索引名称【{}】",indexName,e); } return opFlag; } }
-
续笔记:ElasticSearch Java API之各种查询
-
代码示例:ideaProjects/jar-test/elastic、test/QueryTestTest,示例有补充。