1. 我的springboot版本为2.2.6.RELEASE, 本文推荐elasticsearch-rest-high-level-client在 springboot里集成elasticsearch,首先找到当前es对应的elasticsearch-rest-high-level-client版本
1.1 登录es官网https://www.elastic.co/en/elasticsearch/
1.2 找到doc页面
1.2 搜索ElasticSearch Client,页面左边选择clients和对应的es版本
1.3 往下拉找到Java Rest Client
1.4 点进去一步一步找到需要依赖的包的版本页面,包的版本位置如下
2. 拷贝依赖代码到pom.xml里
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.14.1</version>
</dependency>
3. springboot yml里配置es properties
elasticsearch:
schema: http #请求协议
clusterNodes: 10.67.9.31:9200,10.67.9.31:9201
connectTimeout: 1000 #连接超时时间(毫秒)
socketTimeout: 30000 # socket 超时时间
connRequestTimeout: 500 #连接请求超时时间
index:
numberOfShards: 5 #分片数量
numberOfReplicas: 1 #副本数
4. 编写EsProperties类获取es配置
@Configuration
@ConfigurationProperties("elasticsearch")
public class ESProperties {
/**
* 请求协议
*/
private String schema;
/**
* 集群节点
*/
private String clusterNodes;
/**
* 连接超时时间(毫秒)
*/
private Integer connectTimeout;
/**
* socket 超时时间
*/
private Integer socketTimeout;
/**
* 连接请求超时时间
*/
private Integer connRequestTimeout;
/**
* 索引配置信息
*/
private Index index;
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public String getClusterNodes() {
return clusterNodes;
}
public void setClusterNodes(String clusterNodes) {
this.clusterNodes = clusterNodes;
}
public Integer getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(Integer connectTimeout) {
this.connectTimeout = connectTimeout;
}
public Integer getSocketTimeout() {
return socketTimeout;
}
public void setSocketTimeout(Integer socketTimeout) {
this.socketTimeout = socketTimeout;
}
public Integer getConnRequestTimeout() {
return connRequestTimeout;
}
public void setConnRequestTimeout(Integer connRequestTimeout) {
this.connRequestTimeout = connRequestTimeout;
}
public Index getIndex() {
return index;
}
public void setIndex(Index index) {
this.index = index;
}
/**
* 索引配置信息
*/
public static class Index {
/**
* 分片数量
*/
private Integer numberOfShards;
/**
* 副本数量
*/
private Integer numberOfReplicas;
public Integer getNumberOfShards() {
return numberOfShards;
}
public void setNumberOfShards(Integer numberOfShards) {
this.numberOfShards = numberOfShards;
}
public Integer getNumberOfReplicas() {
return numberOfReplicas;
}
public void setNumberOfReplicas(Integer numberOfReplicas) {
this.numberOfReplicas = numberOfReplicas;
}
}
}
5. 编写ESConfig类获取RestHighLevelClient
@Configuration
public class ESConfig {
@Autowired
ESProperties esProperties;
@Bean
public RestHighLevelClient getRestHighLevelClient(){
String nodes = esProperties.getClusterNodes();
if (StringUtils.isEmpty(nodes)) {
throw new BusinessException("未配置es节点");
}
List<HttpHost> hostList = new ArrayList<HttpHost>();
String[] nodesArr = nodes.split(",");
for (String node : nodesArr) {
String[] nodeArr = node.split(":");
if (nodeArr.length != 2) {
throw new BusinessException("es节点必须是 host:port 格式");
}
HttpHost host = new HttpHost(nodeArr[0], Integer.parseInt(nodeArr[1]), esProperties.getSchema());
hostList.add(host);
}
RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0]));
RestHighLevelClient client = getRestHighLevelClient(builder,esProperties);
return client;
}
private static RestHighLevelClient getRestHighLevelClient(RestClientBuilder builder,ESProperties esProperties) {
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(esProperties.getConnectTimeout());
requestConfigBuilder.setSocketTimeout(esProperties.getSocketTimeout());
requestConfigBuilder.setConnectionRequestTimeout(esProperties.getConnRequestTimeout());
return requestConfigBuilder;
});
return new RestHighLevelClient(builder);
}
}
6. 编写ESService类对ES进行操作
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.GetSourceRequest;
import org.elasticsearch.client.core.GetSourceResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import com.basic.util.JsonUtils;
@Service
public class ESService {
Logger log = LoggerFactory.getLogger(ESService.class);
@Qualifier("getRestHighLevelClient")
@Autowired
private RestHighLevelClient client;
/**
* 创建索引
* @param index 索引名字
* @param shards 分片数量
* @param replicas 副本数量
*/
public void createIndexRequest(String index, Integer shards, Integer replicas) {
if (getIndexRequest(index)) {
return;
}
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index)
.settings(Settings.builder().put("index.number_of_shards", shards).put("index.number_of_replicas", replicas));
try {
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("创建索引库【{}】失败", index, e);
}
}
/**
* 删除索引
* @param index
*/
public boolean deleteIndexRequest(String index) {
DeleteIndexRequest request = new DeleteIndexRequest(index);
try {
AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
} catch (IOException e) {
log.error("删除索引库【{}】失败", index, e);
}
return false;
}
/**
* 获取索引
* @param index
* @return
*/
public boolean getIndexRequest(String index) {
GetIndexRequest request = new GetIndexRequest(index);
try {
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
return exists;
} catch (IOException e) {
log.error("获取索引库【{}】失败", index, e);
}
return false;
}
/**
* 更新文档内容
* @param index
* @param id
* @param object
*/
public void updateRequest(String index, String id, Object object) {
UpdateRequest updateRequest = new UpdateRequest(index, id);
updateRequest.doc(JsonUtils.parseMap(JsonUtils.toJson(object)), XContentType.JSON);
try {
client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("更新索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
}
}
/**
* 插入文档
* @param index
* @param id
* @param object
*/
public void insertRequest(String index, String id, Object object) {
IndexRequest indexRequest = new IndexRequest(index)
.id(id)
.source(JsonUtils.parseMap(JsonUtils.toJson(object)), XContentType.JSON);
try {
client.index(indexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("创建索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
}
}
/**
* 批量插入文档
* @param index
* @param objList
*/
public boolean insertBulkRequest(String index, List<Object> objList) {
BulkRequest bulkRequest = new BulkRequest();
for(Object obj : objList){
Map<String, Object> map = JsonUtils.parseMap(JsonUtils.toJson(obj));
if (map.get("id") != null) {
bulkRequest.add(
new IndexRequest(index)
.source(map, XContentType.JSON)
);
} else {
bulkRequest.add(
new IndexRequest(index)
.id(map.get("id").toString())
.source(map, XContentType.JSON)
);
}
}
try {
BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulk.hasFailures();
} catch (IOException e) {
log.error("批量插入索引文档 {" + index + "} 失败", e);
}
return false;
}
/**
* 删除文档
* @param index
* @param id
*/
public void deleteRequest(String index, String id) {
DeleteRequest deleteRequest = new DeleteRequest(index, id);
try {
client.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("删除索引文档 {" + index + "} 数据id {" + id + "} 失败", e);
}
}
/**
* 查询某个索引文档
* @param index
* @param id
* @return
*/
public Map<String, Object> sourceRequest(String index, String id) {
GetSourceRequest getSourceRequest = new GetSourceRequest(
index,
id);
try {
GetSourceResponse response = client.getSource(getSourceRequest, RequestOptions.DEFAULT);
return response.getSource();
} catch (IOException e) {
log.error("查询索引文档 {" + index + "} 数据id {" + id + "} 失败", e);
}
return null;
}
}
7. 编写单元测试类
@SpringBootTest(classes = {HonneyBeeApiApplication.class})
@RunWith(SpringRunner.class)
public class ESTest {
@Autowired
ESService service;
@Test
public void createIndex() {
service.createIndexRequest("project_task",5,1);
}
}
7.1. 跑起来报错
看了下是因为elasticsearch-rest-high-level-client这个包依赖了6.几的包
修改pom里的包依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.14.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.14.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.14.1</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
7.2 再次跑单元测试类,提示
Caused by: java.lang.invoke.LambdaConversionException: Invalid receiver type interface org.apache.http.Header; not a subtype of implementation type interface org.apache.http.NameValuePair
at java.lang.invoke.AbstractValidatingLambdaMetafactory.validateMetafactoryArgs(AbstractValidatingLambdaMetafactory.java:233)
at java.lang.invoke.LambdaMetafactory.metafactory(LambdaMetafactory.java:303)
at java.lang.invoke.CallSite.makeSite(CallSite.java:302)
... 80 more
把下面2个httpclient包的版本修改了下
7.3 再次执行,成功
8. 通过head插件查看新增的index【project_task】,成功,至此,整合成功
Elasticseach 从零开始学习记录(三) - 分布式部署