1.创建一个mavan项目,项目的以来配置如下。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.es</groupId>
<artifactId>es-test</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies> <dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.3.2</version>
</dependency> <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency> <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency> <dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
</project>
2.建立连接elasticsearch的工具类。
package com.util; import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException; public class ESUtils {
public static final String INDEX_NAME="userindex"; public static String getIndexName(){
return INDEX_NAME;
}
public static final String TYPE_NAME="tweet"; public static String getTypeName(){
return TYPE_NAME;
} public static Client getClient(){
Settings settings = Settings.builder()
//指定集群名称
.put("cluster.name","my-application")
//探测集群中机器状态
.put("client.transport.sniff",true).build(); //创建客户端
Client client = null;
try {
client = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"),9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
} public static void closeClient(Client client){
if(null != client){
client.close();
}
}
}
3.创建索引
/**
* 创建索引
* 如果创建的索引名称已经存在,创建会抛出异常,因此先查询要创建的索引是否已存在,不存在在创建
*/
@Test
public void testCreate(){
//待创建的索引名称
String indexName = "userindex";
//获取连接
Client client = ESUtils.getClient();
//判断索引是否存在
IndicesExistsRequest existsRequest = new IndicesExistsRequest(indexName);
IndicesExistsResponse existsResponse = client.admin().indices().exists(existsRequest).actionGet();
if(!existsResponse.isExists()){
//创建索引
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
CreateIndexResponse createIndexResponse = client.admin().indices().create(createIndexRequest).actionGet();
//判断创建是否成功
if (createIndexResponse.isAcknowledged()){
logger.info("创建成功");
}else{
logger.info("创建失败");
}
}else {
logger.info("索引已存在,无法创建");
}
//关闭连接
ESUtils.closeClient(client);
}
4.向索引添加数据
/**
* 向索引中逐条加入数据
*/
@Test
public void add(){
//获取连接
Client client = ESUtils.getClient();
//待添加的json数据
String strJson ="{\"age\":25,\"name\":\"张三\",\"weight\":76,\"married\":true}";
XContentParser parser = null;
try {
//数据解析,因为不支持json和对象类型的数据,要将其转为map
parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY,DeprecationHandler.THROW_UNSUPPORTED_OPERATION,strJson);
//插入数据
IndexResponse indexResponse = client.prepareIndex().setIndex(ESUtils.getIndexName())
.setType(ESUtils.getTypeName())
.setSource(parser.map())
//设置数据的id,id唯一,如果id已存在则是修改
.setId("2")
.execute()
.actionGet(); logger.info("添加成功,"+indexResponse.status());
} catch (IOException e) {
e.printStackTrace();
}finally {
ESUtils.closeClient(client);
}
}
5.修改数据
/**
* 更新数据
*/
@Test
public void update(){
Client client = ESUtils.getClient();
Map<String,Object> map = new HashMap<String, Object>();
map.put("age",56);
map.put("name","李四");
map.put("weight",69);
map.put("married",false);
UpdateResponse updateResponse = client.prepareUpdate()
.setIndex(ESUtils.getIndexName())
.setType(ESUtils.getTypeName())
.setDoc(map)
//要更新的数据的id
.setId("2").execute().actionGet(); logger.info("更新成功,"+updateResponse.status());
ESUtils.closeClient(client);
}
6.删除数据
/**
* 根据id删除数据
*/
@Test
public void delete(){
//获取连接
Client client = ESUtils.getClient();
//删除数据
DeleteResponse deleteResponse = client.prepareDelete()
.setIndex(ESUtils.getIndexName())
.setType(ESUtils.getTypeName())
//设置id
.setId("1")
.execute().actionGet(); logger.info("删除成功,"+deleteResponse.status());
//关闭连接
ESUtils.closeClient(client);
}
7.根据id查询数据
/**
* 根据id查询数据
*/
@Test
public void select(){
//获取连接
Client client = ESUtils.getClient();
//查询数据
GetResponse getResponse = client.prepareGet()
.setIndex(ESUtils.getIndexName())
.setType(ESUtils.getTypeName())
.setId("1")
.execute().actionGet();
//数据
logger.info("data:="+getResponse.getSourceAsString());
//关闭连接
ESUtils.closeClient(client);
}
8.全文查询
/**
* 全文检索数据
*/
@Test
public void search(){
//获取连接
Client client = ESUtils.getClient();
//设置查询条件
QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("ipad");
//查询
SearchResponse response = client.prepareSearch(ESUtils.getIndexName())
.setQuery(queryBuilder)
//设置分页
.setFrom(0).setSize(60)
.execute().actionGet();
//获取查询结果
SearchHits shs = response.getHits();
logger.info("查询数据条数:"+shs.getTotalHits());
for (SearchHit hit:shs.getHits()) {
logger.info(hit.getSourceAsString());
}
//关闭连接
ESUtils.closeClient(client);
}
9.批量插入数据
//批量插入数据
@Test
public void testBulk(){
//获取连接
Client client = ESUtils.getClient();
//批量接口
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
User user = new User();
//设置对象的属性
for (int i=3;i<40002;i++){
user.setName("user_"+UUID.randomUUID().toString().replace("-","").substring(0,6));
SecureRandom random = new SecureRandom();
long l = Math.abs(random.nextLong());
user.setWeight(l);
user.setMarried(l%3==0?true:false);
user.setAge(l%2==0?28:82);
//将对象转为json字符串
Gson gson = new Gson();
String json = gson.toJson(user);
XContentParser parser = null;
//数据转换
try {
parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);
} catch (IOException e) {
e.printStackTrace();
}
IndexRequestBuilder ir = null;
//预插入数据
try {
ir = client.prepareIndex("userindex","tweet",String.valueOf(i)).setSource(parser.map());
} catch (IOException e) {
e.printStackTrace();
}
bulkRequestBuilder.add(ir);
}
//批量插入
BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); //判断插入是否失败
if(bulkResponse.hasFailures()){
//失败原因
logger.info(bulkResponse.buildFailureMessage());
logger.info("失败");
}
}
10.按指定条件查询
/**
* 按指定属性查询
*/
@Test
public void testSearchByFiled(){
//获取连接
Client client = ESUtils.getClient();
//查询过滤器
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//查询条件 查询字段名称 和 值
boolQueryBuilder.must(QueryBuilders.termQuery("age",28));
boolQueryBuilder.must(QueryBuilders.termQuery("married",true));
//知道要查询的索引和type
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ESUtils.getIndexName()).setTypes(ESUtils.getTypeName());
//设置查询的分页
searchRequestBuilder.setQuery(boolQueryBuilder).setFrom(0).setSize(10000);
//执行查询
SearchResponse response = searchRequestBuilder.execute().actionGet();
//查询结果数
logger.info("数据总数:"+response.getHits().totalHits);
//变量查询结果集
for (SearchHit hit:response.getHits()) {
logger.info(hit.getSourceAsString());
}
//关闭连接
ESUtils.closeClient(client);
}
11.聚合数据
/**
* 按指定条件聚合数据(分类统计)
*/
@Test
public void testFacets(){
//获取连接
Client client = ESUtils.getClient();
//设置待查询的索引和类型
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ESUtils
.getIndexName()).setTypes(ESUtils.getTypeName());
//设置查询条件
TermsAggregationBuilder builder = AggregationBuilders
//查询结果集名称
.terms("marry")
//要查询的属性名称
.field("age");
searchRequestBuilder.addAggregation(builder);
//执行查询
SearchResponse response = searchRequestBuilder.execute().actionGet();
//根据设置的名字获取查询结果
Terms terms = response.getAggregations().get("marry");
List<? extends Terms.Bucket> buckets = terms.getBuckets();
Map<String,String> map = new HashMap<String, String>();
//聚合统计数量
for (Terms.Bucket bucket :buckets){
String key = bucket.getKeyAsString();
map.put(key,bucket.getDocCount()+"");
}
//输出查询数量
for (Map.Entry<String,String> entry : map.entrySet()){
logger.info(entry.getKey()+":"+entry.getValue());
}
//关闭连接
ESUtils.closeClient(client);
}
12.删除索引
/**
* 删除索引
*/
@Test
public void testDelete(){
//获取连接
Client client = ESUtils.getClient();
//判断索引是否存在
IndicesExistsRequest existsRequest = new IndicesExistsRequest(ESUtils.getIndexName());
IndicesExistsResponse existsResponse = client.admin().indices().exists(existsRequest).actionGet();
if(existsResponse.isExists()){
//删除
DeleteIndexResponse deleteIndexResponse = client.admin().indices().prepareDelete(ESUtils.getIndexName()).execute().actionGet();
if(deleteIndexResponse.isAcknowledged()){
logger.info("删除成功");
}else{
logger.info("删除失败");
}
}else {
logger.info("索引 "+ESUtils.getIndexName()+" 不存在");
}
//关闭连接
ESUtils.closeClient(client);
}