Elasticseach 从零开始学习记录(四) - 整合springboot2.x

1. 我的springboot版本为2.2.6.RELEASE, 本文推荐elasticsearch-rest-high-level-client在           springboot里集成elasticsearch,首先找到当前es对应的elasticsearch-rest-high-level-client版本

   1.1  登录es官网https://www.elastic.co/en/elasticsearch/

   1.2 找到doc页面

         Elasticseach 从零开始学习记录(四) - 整合springboot2.x

   1.2 搜索ElasticSearch Client,页面左边选择clients和对应的es版本

        Elasticseach 从零开始学习记录(四) - 整合springboot2.x

 

   1.3 往下拉找到Java Rest Client

        Elasticseach 从零开始学习记录(四) - 整合springboot2.x

    1.4  点进去一步一步找到需要依赖的包的版本页面,包的版本位置如下

         Elasticseach 从零开始学习记录(四) - 整合springboot2.x

 

 

 2.  拷贝依赖代码到pom.xml里  

<dependency>
			    <groupId>org.elasticsearch.client</groupId>
			    <artifactId>elasticsearch-rest-high-level-client</artifactId>
			    <version>7.14.1</version>
</dependency>

3. springboot yml里配置es properties

elasticsearch: 
    schema: http  #请求协议
    clusterNodes: 10.67.9.31:9200,10.67.9.31:9201
    connectTimeout: 1000	#连接超时时间(毫秒)
    socketTimeout: 30000	# socket 超时时间
    connRequestTimeout: 500 #连接请求超时时间
    index: 
      numberOfShards: 5 #分片数量
      numberOfReplicas: 1 #副本数

4. 编写EsProperties类获取es配置

@Configuration
@ConfigurationProperties("elasticsearch")
public class ESProperties {
	/**
     * 请求协议
     */
    private String schema;

    /**
     * 集群节点
     */
    private String clusterNodes;

    /**
     * 连接超时时间(毫秒)
     */
    private Integer connectTimeout;

    /**
     * socket 超时时间
     */
    private Integer socketTimeout;

    /**
     * 连接请求超时时间
     */
    private Integer connRequestTimeout;

    /**
     * 索引配置信息
     */
    private Index index;
    
    

    public String getSchema() {
		return schema;
	}



	public void setSchema(String schema) {
		this.schema = schema;
	}



	public String getClusterNodes() {
		return clusterNodes;
	}



	public void setClusterNodes(String clusterNodes) {
		this.clusterNodes = clusterNodes;
	}



	public Integer getConnectTimeout() {
		return connectTimeout;
	}



	public void setConnectTimeout(Integer connectTimeout) {
		this.connectTimeout = connectTimeout;
	}



	public Integer getSocketTimeout() {
		return socketTimeout;
	}



	public void setSocketTimeout(Integer socketTimeout) {
		this.socketTimeout = socketTimeout;
	}



	public Integer getConnRequestTimeout() {
		return connRequestTimeout;
	}



	public void setConnRequestTimeout(Integer connRequestTimeout) {
		this.connRequestTimeout = connRequestTimeout;
	}



	public Index getIndex() {
		return index;
	}



	public void setIndex(Index index) {
		this.index = index;
	}



	/**
     * 索引配置信息
     */
    
    public static class Index {

        /**
         * 分片数量
         */
        private Integer numberOfShards;

        /**
         * 副本数量
         */
        private Integer numberOfReplicas;

		public Integer getNumberOfShards() {
			return numberOfShards;
		}

		public void setNumberOfShards(Integer numberOfShards) {
			this.numberOfShards = numberOfShards;
		}

		public Integer getNumberOfReplicas() {
			return numberOfReplicas;
		}

		public void setNumberOfReplicas(Integer numberOfReplicas) {
			this.numberOfReplicas = numberOfReplicas;
		}
        
        

    }
}

5. 编写ESConfig类获取RestHighLevelClient

@Configuration
public class ESConfig {
	
	@Autowired
	ESProperties esProperties;
	
	@Bean
    public RestHighLevelClient getRestHighLevelClient(){
		String nodes = esProperties.getClusterNodes();
		if (StringUtils.isEmpty(nodes)) {
			throw new BusinessException("未配置es节点");
		}
		List<HttpHost> hostList = new ArrayList<HttpHost>();
		String[] nodesArr = nodes.split(",");
		for (String node : nodesArr) {
			String[] nodeArr = node.split(":");
			if (nodeArr.length != 2) {
				throw new BusinessException("es节点必须是 host:port 格式");
			}
			HttpHost host = new HttpHost(nodeArr[0], Integer.parseInt(nodeArr[1]), esProperties.getSchema());
			hostList.add(host);
		}
		RestClientBuilder builder =  RestClient.builder(hostList.toArray(new HttpHost[0]));
		RestHighLevelClient client = getRestHighLevelClient(builder,esProperties);
		return client;
    }
	
	
    private static RestHighLevelClient getRestHighLevelClient(RestClientBuilder builder,ESProperties esProperties) {
        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(esProperties.getConnectTimeout());
            requestConfigBuilder.setSocketTimeout(esProperties.getSocketTimeout());
            requestConfigBuilder.setConnectionRequestTimeout(esProperties.getConnRequestTimeout());
            return requestConfigBuilder;
        });
        
        return new RestHighLevelClient(builder);
    }
}

6.  编写ESService类对ES进行操作


import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.GetSourceRequest;
import org.elasticsearch.client.core.GetSourceResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import com.basic.util.JsonUtils;


@Service
public class ESService {
	
	Logger log = LoggerFactory.getLogger(ESService.class);
	
	@Qualifier("getRestHighLevelClient")
    @Autowired
    private RestHighLevelClient client;
	
	/**
	 * 创建索引
	 * @param index  索引名字
	 * @param shards  分片数量
	 * @param replicas 副本数量
	 */
	public void createIndexRequest(String index, Integer shards, Integer replicas) {
		if (getIndexRequest(index)) {
			return;
		}
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(index)
                .settings(Settings.builder().put("index.number_of_shards", shards).put("index.number_of_replicas", replicas));
        try {
            client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("创建索引库【{}】失败", index, e);	
        }
    }

	/**
	 * 删除索引
	 * @param index
	 */
    public boolean deleteIndexRequest(String index) {
        DeleteIndexRequest request = new DeleteIndexRequest(index);
        try {
            AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
           return response.isAcknowledged();
        } catch (IOException e) {
            log.error("删除索引库【{}】失败", index, e);
        }
        return false;
    }
    
    /**
     * 获取索引
     * @param index
     * @return
     */
    public boolean getIndexRequest(String index) {
    	 GetIndexRequest request = new GetIndexRequest(index);
         try {
             boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
             return exists;
         } catch (IOException e) {
        	 log.error("获取索引库【{}】失败", index, e);
         }
         return false;
    }

    /**
     * 更新文档内容
     * @param index
     * @param id
     * @param object
     */
    public void updateRequest(String index, String id, Object object) {
        UpdateRequest updateRequest = new UpdateRequest(index, id);
        updateRequest.doc(JsonUtils.parseMap(JsonUtils.toJson(object)), XContentType.JSON);
        try {
            client.update(updateRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("更新索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
        }
    }

    /**
     * 插入文档
     * @param index
     * @param id
     * @param object
     */
    public void insertRequest(String index, String id, Object object) {
        IndexRequest indexRequest = new IndexRequest(index)
        	.id(id)
        	.source(JsonUtils.parseMap(JsonUtils.toJson(object)), XContentType.JSON);
        try {
        	client.index(indexRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("创建索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
        }
    }
    /**
     * 批量插入文档
     * @param index
     * @param objList
     */
    public boolean insertBulkRequest(String index, List<Object> objList) {
    	BulkRequest bulkRequest = new BulkRequest();
    	for(Object obj : objList){
    		Map<String, Object> map = JsonUtils.parseMap(JsonUtils.toJson(obj));
    		if (map.get("id") != null) {
    			bulkRequest.add(
                        new IndexRequest(index)
                                .source(map, XContentType.JSON)
                );
    		} else {
    			bulkRequest.add(
                        new IndexRequest(index)
                                .id(map.get("id").toString())
                                .source(map, XContentType.JSON)
                );
    		}
        }
    	try {
			BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
			return bulk.hasFailures();
		} catch (IOException e) {
			 log.error("批量插入索引文档 {" + index + "} 失败", e);
		}
    	return false;
    }

    /**
     * 删除文档
     * @param index
     * @param id
     */
    public void deleteRequest(String index, String id) {
        DeleteRequest deleteRequest = new DeleteRequest(index, id);
        try {
            client.delete(deleteRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("删除索引文档 {" + index + "} 数据id {" + id + "} 失败", e);
        }
    }
    
    /**
     * 查询某个索引文档
     * @param index
     * @param id
     * @return
     */
    public Map<String, Object> sourceRequest(String index, String id) {
    	GetSourceRequest getSourceRequest = new GetSourceRequest(
    			index, 
    		    id);
    	try {
			GetSourceResponse response = client.getSource(getSourceRequest, RequestOptions.DEFAULT);
			return response.getSource();
		} catch (IOException e) {
			log.error("查询索引文档 {" + index + "} 数据id {" + id + "} 失败", e);
		}
    	return null;
    }
}

7. 编写单元测试类

@SpringBootTest(classes = {HonneyBeeApiApplication.class})
@RunWith(SpringRunner.class)
public class ESTest {
	
	@Autowired
	ESService service;
	
	@Test
    public void createIndex() {
		service.createIndexRequest("project_task",5,1);
    }

}

        7.1. 跑起来报错

             Elasticseach 从零开始学习记录(四) - 整合springboot2.x

 

             看了下是因为elasticsearch-rest-high-level-client这个包依赖了6.几的包

          Elasticseach 从零开始学习记录(四) - 整合springboot2.x

             修改pom里的包依赖  

             

 <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>7.14.1</version>
    </dependency>
    <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-client</artifactId>
      <version>7.14.1</version>
    </dependency>
   <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>7.14.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.elasticsearch.client</groupId>
          <artifactId>elasticsearch-rest-client</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.elasticsearch</groupId>
          <artifactId>elasticsearch</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

         7.2 再次跑单元测试类,提示

Caused by: java.lang.invoke.LambdaConversionException: Invalid receiver type interface org.apache.http.Header; not a subtype of implementation type interface org.apache.http.NameValuePair
	at java.lang.invoke.AbstractValidatingLambdaMetafactory.validateMetafactoryArgs(AbstractValidatingLambdaMetafactory.java:233)
	at java.lang.invoke.LambdaMetafactory.metafactory(LambdaMetafactory.java:303)
	at java.lang.invoke.CallSite.makeSite(CallSite.java:302)
	... 80 more

                把下面2个httpclient包的版本修改了下

                Elasticseach 从零开始学习记录(四) - 整合springboot2.x

                     7.3 再次执行,成功

        8. 通过head插件查看新增的index【project_task】,成功,至此,整合成功

              Elasticseach 从零开始学习记录(四) - 整合springboot2.x

 

 

Elasticseach 从零开始学习记录(三) - 分布式部署

        

    

 

 

 

上一篇:springboot(7)——访问静态资源&WebJars&图标&欢迎页面


下一篇:SpringBoot 基于web应用开发(请求参数获取,静态资源,webjars)