ElasticSearch Java API之crud

Java连接es客户端方式

  • Elasticsearch Java API有四类client连接方式:TransportClient、RestClient、Jest和Spring Data Elasticsearch。

  • 其中TransportClient和RestClient是es原生的Api。TransportClient可以支持2.x,5.x版本,但将被7.0弃用并在8.0中删除,因此不推荐使用,取而代之的是Java High Level REST Client,它使用HTTP请求而不是Java序列化请求。

  • 其中Jest是Java社区开发的,是es的Java Http Rest客户端,更新有一定延迟,目前最新版本对接es 6.3.1,也不推荐使用。

  • Spring Data Elasticsearch则是Spring集成的es开发包,与Spring生态对接。

  • 推荐使用RestClient,而elastic官方提供了高低阶两个JAVA REST Client版本:

  1. low level rest client:低级别的rest客户端,通过http与集群交互,用户需要自己编组请求json串、解析响应的json串。
  2. high level rest client:高级别客户端,基于低级别的客户端,增加了编组请求json串、解析响应的json串等API。
  • 所有API支持同步、异步两种方式,同步方法只返回一个结果对象;异步方法以Async为后缀,通过listener参数来通知结果。

  • 依赖:

      <!--elasticsearch全文搜索引擎-->
      <dependency>
          <groupId>org.elasticsearch.client</groupId>
          <artifactId>elasticsearch-rest-high-level-client</artifactId>
          <version>7.2.0</version>
      </dependency>
    
      <dependency>
          <groupId>org.elasticsearch</groupId>
          <artifactId>elasticsearch</artifactId>
          <version>7.2.0</version>
      </dependency>
    

部分API

  • GetResponse:返回所查询的文档。

      //这两个属性是查询请求提供的,肯定有
      String index = getResponse.getIndex();
      String id = getResponse.getId();
      
      //判断是否有文档
      if (getResponse.isExists()) {
          long version = getResponse.getVersion();
          //将文档返回为String形式
          String sourceAsString = getResponse.getSourceAsString();
          //返回为Map
          Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
          //返回为字节数组
          byte[] sourceAsBytes = getResponse.getSourceAsBytes();          
      } else {
          
      }
    
  • 可选属性:

      //不返回_source字段,默认返回
      request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
      
      //设置允许返回和过滤的属性
      String[] includes = new String[]{"message", "*Date"};
      String[] excludes = Strings.EMPTY_ARRAY;
      //true:返回_source字段
      FetchSourceContext fetchSourceContext = 
          new FetchSourceContext(true, includes, excludes);
      request.fetchSourceContext(fetchSourceContext);
      
      //单独保存某个属性
      request.storedFields("message"); 
      GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
      String message = getResponse.getField("message").getValue();
    
  • request.opType(DocWriteRequest.OpType || str): 指定请求类型,DocWriteRequest.OpType类型或String。

      IndexRequest request=new IndexRequest();
      
      request.opType(DocWriteRequest.OpType.CREATE); 
      request.opType("create"); //create或update(默认)
    

示例

  1. client连接获取工具
  • (1)不做任何自定义设置,直接获取。

      public class ESClientConnectionUtil {
      
          public static RestHighLevelClient client;
      
          /**
           * @Author haien
           * @Description 获取client
           * @Date 2019/7/18
           * @Param []
           * @return org.elasticsearch.client.RestHighLevelClient
           **/
          public static RestHighLevelClient getESClientConnection(){
          
              if(client==null){
                  //创建client,可给集群new多个节点地址,使负载均衡
                  client=new RestHighLevelClient(RestClient.builder(
                          new HttpHost("127.0.0.1",9200,"http")
                          //,new HttpHost("192.168.200.100",9200,"http")));
              }
              return client;
          }
      
          /**
           * @Author haien
           * @Description 关闭client
           * @Date 2019/7/18
           * @Param []
           * @return void
           **/
          public static void close() throws IOException {
              client.close();
          }
      }
    
  • (2)自定义连接属性。

      /**
       * @Author haien
       * @Description elastic 连接 配置类
       * @Date 2019/7/23
       **/
      @Configuration
      public class RestClientConfig {
          //elastic连接地址
          @Value("${elasticsearch.ipAddrs}")
          private String[] ipAddrs;
      
          //连接目标url最大超时
          @Value("${elasticsearch.client.connectTimeOut}")
          private Integer connectTimeOut;
      
          //等待响应(读数据最大超时)
          @Value("${elasticsearch.client.socketTimeOut}")
          private Integer socketTimeOut;
      
          //从连接池中获取可用连接最大超时时间
          @Value("${elasticsearch.client.connectionRequestTime}")
          private Integer connectionRequestTime;
      
          //连接池中的最大连接数
          @Value("${elasticsearch.client.maxConnectNum}")
          private Integer maxConnectNum;
      
          //连接同一个route最大的并发数
          @Value("${elasticsearch.client.maxConnectionPerRoute}")
          private Integer maxConnectionPerRoute;
      
          @Bean
          @Scope("singleton")
          public RestClient restClient(){
              return getFactory().getClient();
          }
      
          //注册连接的bean
          @Bean("client")
          @Scope("singleton")
          public RestHighLevelClient restHighClient(){
              //调用下面getFactory(),获取工厂后获取连接
              return getFactory().getRestHighClient();
          }
      
          //获取client工厂,此时先执行ElasticRestClientFactory的名为init的方法
          @Bean(initMethod = "init",destroyMethod = "close")
          public ElasticRestClientFactory getFactory(){
              return ElasticRestClientFactory.build(httpHost(),connectTimeOut,socketTimeOut,
                      connectionRequestTime,maxConnectNum,maxConnectionPerRoute);
          }
      
          /**
           * @Author haien
           * @Description 将elastic连接地址封装为HttpHost类
           * @Date 2019/7/23
           * @Param []
           * @return org.apache.http.HttpHost[]
           **/
          @Bean
          public HttpHost[] httpHost(){
              HttpHost[] httpHosts=new HttpHost[ipAddrs.length];
              for (int i=0;i<ipAddrs.length;i++){
                  String[] ipAddr=ipAddrs[i].split(":");
                  //ip、端口号、协议
                  httpHosts[i]=new HttpHost(ipAddr[0],
                          Integer.valueOf(ipAddr[1]),"http");
              }
              return httpHosts;
          }
      
      }
      
      /**
       * @Author haien
       * @Description client连接工厂类
       * @Date 2019/7/23
       **/
      public class ElasticRestClientFactory {
          private static final Logger logger=
                  LoggerFactory.getLogger(ElasticRestClientFactory.class);
      
          //属性预先给默认值,等下再覆盖
          //连接目标url最大超时,并设置默认值
          public static int CONNECT_TIMEOUT_MILLIS=3000;
          //等待响应(读数据)最大超时
          public static int SOCKET_TIMEOUT_MILLIS=6000;
          //从连接池中获取可用连接最大超时
          public static int CONNECTION_REQUEST_TIMEOUT_MILLIS=2000;
          //连接池中的最大连接数
          public static int MAX_CONN_TOTAL=15;
          //连接同一个route最大的并发数
          public static int MAX_CONN_PER_ROUTE=10;
      
          private static HttpHost[] HTTP_HOST;
          private RestClientBuilder builder;
          private RestClient restClient;
          private RestHighLevelClient restHighLevelClient;
          
          private static ElasticRestClientFactory restClientFactory=
              new ElasticRestClientFactory();
      
          /**
           * @Author haien
           * @Description 初始化方法:设置client的属性
           * @Date 2019/7/23
           * @Param []
           * @return void
           **/
          public void init(){
              builder=RestClient.builder(HTTP_HOST);
              setConnectTimeOutConfig();
              setMutiConnectConfig();
              
              restClient=builder.build();
              restHighLevelClient=new RestHighLevelClient(builder);
              
              logger.info("ElasticSearch highLevelRestClient init successful");
          }
      
          /**
           * @Author haien
           * @Description 配置连接超时
           * @Date 2019/7/23
           * @Param []
           * @return void
           **/
          public void setConnectTimeOutConfig(){
          
              builder.setRequestConfigCallback(requestConfigBuilder -> {
                  requestConfigBuilder.setConnectionRequestTimeout(CONNECT_TIMEOUT_MILLIS);
                  requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
                  requestConfigBuilder
                      .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
                  return requestConfigBuilder;
              });
          }
      
          /**
           * @Author haien
           * @Description 使用异步HTTPClient时设置并发连接数
           * @Date 2019/7/23
           * @Param []
           * @return void
           **/
          public void setMutiConnectConfig(){
              builder.setHttpClientConfigCallback(httpClientBuilder -> {
                  httpClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL);
                  httpClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
                  return httpClientBuilder;
              });
          }
      
          public RestClient getClient(){
              return restClient;
          }
      
          public RestHighLevelClient getRestHighClient(){
              return restHighLevelClient;
          }
          /**
           * @Author haien
           * @Description 不设置各种超时
           * @Date 2019/7/23
           * @Param [httpHost, maxConnectNum, maxConnectPerRoute]
           * @return com.haien.test.elastic.ElasticRestClientFactory
           **/
          public static ElasticRestClientFactory build(HttpHost[] httpHost,
                 Integer maxConnectNum,Integer maxConnectPerRoute){
      
              HTTP_HOST=httpHost;
              MAX_CONN_TOTAL=maxConnectNum;
              MAX_CONN_PER_ROUTE=maxConnectPerRoute;
              return restClientFactory;
          }
      
          /**
           * @Author haien
           * @Description 设置各种超时
           * @Date 2019/7/23
           * @Param [httpHost, connectTimeOut, socketTimeOut, connectRequestTime, 
              maxConnectNum, maxConnectPerRoute]
           * @return com.haien.test.elastic.ElasticRestClientFactory
           **/
          public static ElasticRestClientFactory build(HttpHost[] httpHost,
                        Integer connectTimeOut,
                        Integer socketTimeOut,Integer connectRequestTime,
                        Integer maxConnectNum,Integer maxConnectPerRoute){
      
              HTTP_HOST=httpHost;
              CONNECT_TIMEOUT_MILLIS=connectTimeOut;
              SOCKET_TIMEOUT_MILLIS=socketTimeOut;
              CONNECTION_REQUEST_TIMEOUT_MILLIS=connectRequestTime;
              MAX_CONN_TOTAL=maxConnectNum;
              MAX_CONN_PER_ROUTE=maxConnectPerRoute;
              return restClientFactory;
          }
      
          /**
           * @Author haien
           * @Description 销毁方法:关闭连接
           * @Date 2019/7/23
           * @Param []
           * @return void
           **/
          public void close(){
              if(restClient!=null){
                  try {
                      restClient.close();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
              logger.info("ElasticSearch highLevelRestClient closed");
          }
      }
    
  • 获取连接:

      @Component
      public class QueryTest {
    
          @Resource
          private RestHighLevelClient client;
      }
    
  • 注意:client是通过配置类RestClientConfig注册在spring容器中的,而RestClientConfig中使用了@Value,所注入的属性都在client这个bean中,如果不通过@Resource注入client而是new或其他任何方式的话,则
    @Value不会被用上,属性也不会注入。而且client所在类QueryTest是用来实现各种检索方法的,但不能在本类用main测试,因为用main的话QueryTest就只能由new创建,这样创建出来的QueryTest对象,其client属性为null。

  1. 对索引、文档的crud方法

     public class Test {
    
         private RestHighLevelClient client=ESClientConnectionUtil.getESClientConnection();
     
         /**
          * @Author haien
          * @Description 创建index
          * @Date 2019/7/18
          * @Param []
          * @return void
          **/
         public boolean createIndex() throws IOException {
             //索引名称
             CreateIndexRequest request=new CreateIndexRequest("books")
                     //设置分片、副本和刷新间隔
                     .settings(Settings.builder()
                             .put("index.number_of_shards",3)
                             .put("index.number_of_replicas",2)
                             .put("refresh_interval","10s")
                             //默认分词器
                             //.put("analysis.analyzer.default.tokenizer","ik_smart")
                     );
     
             //创建fullText属性
             Map<String,Object> fullText=new HashMap<>();
             fullText.put("type","text");
             fullText.put("analyzer","ik_max_word");
             fullText.put("search_analyzer","ik_smart");
             fullText.put("term_vector","with_positions_offsets");
     
             //创建fondCode属性
             Map<String,Object> fondCode=new HashMap<>();
             fondCode.put("type","keyword");
     
             Map<String,Object> properties=new HashMap<>();
             properties.put("fullText",fullText);
             properties.put("fondCode",fondCode);
     
             Map<String,Object> mapping=new HashMap<>();
             mapping.put("properties",properties);
     
             request.mapping(mapping);
     
             /*Map<String, Object> message = new HashMap<>();
             message.put("type", "text");
             Map<String, Object> properties = new HashMap<>();
             properties.put("message", message);
             Map<String, Object> mapping = new HashMap<>();
             mapping.put("properties", properties);
             request.mapping(mapping);*/
     
     /*
     
             request.mapping(
                     "{\n" +
                             "  \"properties\": {\n" +
                             "    \"message\": {\n" +
                             "      \"type\": \"text\"\n" +
                             "    }\n" +
                             "  }\n" +
                             "}",
                     XContentType.JSON);
     */
     
             CreateIndexResponse response=
                     client.indices().create(request,RequestOptions.DEFAULT);
     
             boolean acknowledged=response.isAcknowledged();
             return acknowledged;
         }
     
         /**
          * @Author haien
          * @Description 删除索引
          * @Date 2019/7/21
          * @Param [indexName]
          * @return boolean
          **/
         public boolean deleteIndex(String indexName) throws IOException {
             //判断索引是否存在
             if(!existIndex(indexName))
                 return false;
     
             DeleteIndexRequest request=new DeleteIndexRequest(indexName);
             AcknowledgedResponse response=
                     client.indices().delete(request, RequestOptions.DEFAULT);
             //如果事先不判存,不存在时应该是返回false
             return response.isAcknowledged();
         }
     
         /**
          * @Author haien
          * @Description 判断索引是否存在
          * @Date 2019/7/21
          * @Param [indexName]
          * @return boolean
          **/
         public boolean existIndex(String indexName) throws IOException {
             Assert.notNull(indexName,"索引名称不能为空");
             GetIndexRequest request=new GetIndexRequest(indexName);
             return client.indices().exists(request, RequestOptions.DEFAULT);
         }
     
         /**
          * @Author haien
          * @Description 新增文档
          * @Date 2019/7/21
          * @Param []
          * @return void
          **/
         public boolean addDocument(String indexName,String id,Map<String,Object> docMap)
                 throws IOException {
             /*
             * 四种文档设置格式:json字符串、Map或XContentBuilder对象
             * */
             /*
             IndexRequest request=new IndexRequest("posts");
             request.id("1");
             String jsonString="{" +
                     "\"user\":\"kimchy\"," +
                     "\"postDate\":\"2013-01-30\"," +
                     "\"message\":\"trying out Elasticsearch\"" +
                     "}";
             //把字符串转为json,返回的还是IndexRequest
             request.source(jsonString,XContentType.JSON);
     */
             /*或者*/
             /*
             IndexRequest request=new IndexRequest("posts").id("1")
                     //自动转为json
                     .source("user","kimchy",
                         "postDate",new Date(),
                         "message","trying out Elasticsearch");*/
     
             /*或者*/
             /*
             Map<String,Object> jsonMap=new HashMap<>();
             jsonMap.put("user","kimchy");
             jsonMap.put("postDate",new Date());
             jsonMap.put("message","trying out Elasticsearch");
             //map会自动被转为json
             IndexRequest request=new IndexRequest("posts").id("1").source(jsonMap);
     */
             /*或者*/
             //用来生成json的工具
             /*
             XContentBuilder builder=XContentFactory.jsonBuilder();
             builder.startObject();
             {
                 builder.field("user","kimchy");
                 builder.field("postDate",new Date());
                 builder.field("message","trying out Elasticsearch");
             }
             builder.endObject();
             IndexRequest request=new IndexRequest("posts").id("1").source(builder);*/
     
             boolean opFlag=Boolean.TRUE;
             try {
                 Assert.notNull(indexName, "索引名称不能为空");
                 Assert.notNull(id, "索引文档id不能为空");
                 Assert.notNull(docMap, "索引文档docMap不能为空");
     
                 //未指定type,默认为_doc
                 IndexRequest request = new IndexRequest(indexName).id(id).source(docMap);
                 request.opType(DocWriteRequest.OpType.CREATE);
     
                 client.index(request, RequestOptions.DEFAULT);
             } catch (Exception e){
                 logger.error("添加索引文档失败异常,索引名称【{}】,索引文档【{}】",
                         indexName,id,e);
                 opFlag=Boolean.FALSE;
             }
             return opFlag;
     
         }
     
         /**
          * @Author haien
          * @Description 查询文档
          * @Date 2019/7/21
          * @Param [indexName, type, id]
          * @return java.util.Map<java.lang.String,java.lang.Object>
          **/
         public Map<String,Object> getDocument(String indexName,String id){
             if(!existDocument(indexName,id)){
                 logger.error("索引或id不存在,索引名称【{}】,索引文档【{}】",
                         indexName,id);
                 return null;
             }
     
             Map<String,Object> docMap=null;
             try {
                 Assert.notNull(indexName,"索引名称不能为空");
                 Assert.notNull(id,"索引文档id不能为空");
     
                 GetRequest request = new GetRequest(indexName,id);
                 GetResponse response=client.get(request,RequestOptions.DEFAULT);
                 docMap=response.getSourceAsMap();
             } catch (Exception e){
                 logger.error("根据id获取索引文档异常,索引名称【{}】,索引文档【{}】",
                         indexName,id,e);
             }
     
             return docMap;
         }
     
         /**
          * @Author haien
          * @Description 判断文档是否存在
          * @Date 2019/7/21
          * @Param [indexName, id]
          * @return boolean
          **/
         public boolean existDocument(String indexName,String id){
             boolean opFlag=Boolean.TRUE;
     
             try {
                 Assert.notNull(indexName,"索引名称不能为空");
                 Assert.notNull(id,"索引文档id不能为空");
     
                 GetRequest request=new GetRequest(indexName,id);
                 request.fetchSourceContext(new FetchSourceContext(false));
                 request.storedFields("_none_");
                 opFlag=client.exists(request,RequestOptions.DEFAULT);
             } catch (IOException e) {
                 logger.error("根据id判断索引文档是否存在异常,
                     索引名称【{}】,索引文档id【{}】," +
                         indexName,id,e);
             }
     
             return opFlag;
         }
     
         /**
          * @Author haien
          * @Description 删除文档
          * @Date 2019/7/21
          * @Param [indexName, type, id]
          * @return boolean
          **/
         public boolean deleteDocument(String indexName,String id){
             boolean opFlag=Boolean.TRUE;
             try{
                 Assert.notNull(indexName,"索引名称不能为空");
                 Assert.notNull(id,"索引文档id不能为空");
     
                 GetRequest request=new GetRequest(indexName,id);
                 request.fetchSourceContext(new FetchSourceContext(false));
                 request.storedFields("_none_");
                 opFlag=client.exists(request,RequestOptions.DEFAULT);
                 if(opFlag){
                     DeleteRequest request1=new DeleteRequest(indexName,id);
                     client.delete(request1,RequestOptions.DEFAULT);
                 }
             } catch (IOException e) {
                 logger.error("根据id判断索引文档是否存在异常,索引名称【{}】,索引文档id【{}】",
                         indexName,id);
             }
     
             return opFlag;
         }
     
         /**
          * @Author haien
          * @Description 更新文档
          * @Date 2019/7/21
          * @Param [indexName, type, id, docMap]
          * @return boolean
          **/
         public boolean updateDocument(String indexName,String id,
                 Map<String,Object> docMap){
             boolean opFlag=Boolean.TRUE;
     
             try {
                 Assert.notNull(indexName,"索引名称不能为空");
                 Assert.notNull(id,"文档id不能为空");
                 Assert.notNull(docMap,"索引文档docMap不能为空");
     
                 UpdateRequest request=new UpdateRequest(indexName,id).doc(docMap);
                 client.update(request,RequestOptions.DEFAULT);
             } catch (IOException e) {
                 logger.error("修改文档异常,索引名称【{}】,文档id【{}】",indexName,id,e);
             }
     
             return opFlag;
         }
     
         /**
          * @Author haien
          * @Description 批量添加文档
          * @Date 2019/7/21
          * @Param [indexName, docMaps]
          * @return boolean
          **/
         public boolean bulkAddDocument(String indexName, List<Map<String,Object>> docMaps){
             boolean opFlag=Boolean.TRUE;
     
             try {
                 Assert.notNull(indexName,"索引名称不能为空");
                 Assert.notNull(docMaps,"索引文档docMaps不能为空");
     
                 BulkRequest request=new BulkRequest();
                 for (int i=0;i<docMaps.size();i++){
                     Map<String,Object> docMap=docMaps.get(i);
                     request.add(new IndexRequest(indexName).id(docMap.get("id")+"")
                             .source(docMap).opType(DocWriteRequest.OpType.CREATE));
                 }
                 client.bulk(request,RequestOptions.DEFAULT);
             } catch (IOException e) {
                 logger.error("批量添加文档异常,索引名称【{}】",indexName,e);
             }
             
             //处理响应
             if(responses!=null){
                 for(BulkItemResponse bulkItemResponse:responses){
                     DocWriteResponse itemResponse=bulkItemResponse.getResponse();
                     //操作类型为新增文档
                     if(bulkItemResponse.getOpType()==DocWriteRequest.OpType.INDEX
                         ||bulkItemResponse.getOpType()==DocWriteRequest.OpType.CREATE){
                         IndexResponse indexResponse=(IndexResponse)itemResponse;
                         //新增文档成功的处理
                         
                     }else if(bulkItemResponse.getOpType() == 
                             DocWriteRequest.OpType.UPDATE) {
                         DeleteResponse deleteResponse=(DeleteResponse)itemResponse;
                         //删除文档成功的处理
                     }
                 }
             }
     
             return opFlag;
         }
     
         /**
          * @Author haien
          * @Description 批量修改文档
          * @Date 2019/7/21
          * @Param [indexName, docMaps]
          * @return boolean
          **/
         public boolean bulkUpdateDocument(String indexName,
                 List<Map<String,Object>> docMaps){
                 
             boolean opFlag=Boolean.TRUE;
     
             try {
                 Assert.notNull(indexName,"索引名称不能为空");
                 Assert.notNull(docMaps,"索引文档docMaps不能为空");
     
                 BulkRequest request=new BulkRequest();
                 for (int i=0;i<docMaps.size();i++){
                     Map<String,Object> docMap=docMaps.get(i);
                     request.add(
                             new UpdateRequest(indexName,docMap.get("id")+"").doc(docMap));
                 }
                 client.bulk(request,RequestOptions.DEFAULT);
             } catch (IOException e) {
                 logger.error("批量修改文档异常,索引名称【{}】",indexName,e);
             }
     
             return opFlag;
         }
     
         /**
          * @Author haien
          * @Description 批量删除文档
          * @Date 2019/7/25
          * @Param [indexName, docMaps]
          * @return boolean
          **/
         public boolean bulkdeleteDocument(String indexName,
                 List<Map<String,Object>> docMaps){
                 
             boolean opFlag=Boolean.TRUE;
     
             try {
                 Assert.notNull(indexName,"索引名称不能为空");
                 Assert.notNull(docMaps,"索引文档docMaps不能为空");
     
                 BulkRequest request=new BulkRequest();
                 for (int i=0;i<docMaps.size();i++){
                     Map<String,Object> docMap=docMaps.get(i);
                     request.add(new DeleteRequest(indexName,docMap.get("id")+""));
                 }
                 client.bulk(request,RequestOptions.DEFAULT);
             } catch (IOException e) {
                 logger.error("批量删除文档异常,索引名称【{}】",indexName,e);
             }
     
             return opFlag;
         }
     }
    
上一篇:mybatis Mapper文件内的配置标签使用 & CRUD编写 场景作用理解(一)


下一篇:[resolved] login to server failed: EOF