这里使用的Java客户端版本是5.1.2,Elasticsearch的版本号也要是5.1.2,否则一些功能可能不支持。
之前介绍过Spring Data Elasticsearch,那里也是使用了本文介绍的官方客户端,只不过Spring Data Elasticsearch是一个社区项目,更新较慢,目前支持到Elasticsearch 2.4。
一、客户端简介
你可以使用Java client来执行多种操作:
- 在一个已经存在的集群中执行标准的index, get, delete 和 search操作。
- 在一个正在运行的集群中执行管理员任务
获得一个Client
是简单的。最通用的方式是创建一个TransportClient
连接到集群。
maven依赖:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.1.2</version>
</dependency>
二、Transport Client
TransportClient
远程连接到一个Elasticsearch集群。它并不加入集群,只是获得一个或多个初始化transport地址,并且对于每个行为以循环方式与它们通讯(尽管大多数行为将会分成两段式操作)。
// 启动时
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
// 关闭时
client.close();
注意,如果你的集群名称不叫"elasticsearch",那么你必须指定它的名字:
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build();
TransportClient client = new PreBuiltTransportClient(settings);
//Add transport addresses and do something with the client...
Transport client具有一个集群嗅探特性,允许你动态的增加新主机或者移除老主机。当嗅探被激活时,transport client将会连接到内部的节点列表,就是通过addTransportAddress
方法构建的节点。然后client将会在这些节点上调用内部的集群状态API来发现可用的数据节点。内部的节点列表将会被这些数据节点替换。这个列表默认每5秒刷新一次。注意嗅探连接的IP地址是那些在节点的elasticsearch配置中被声明为发布的地址。
记住,上面的节点列表可能不包活原始的节点,如果这个原始节点不是一个数据节点的话。举个例子,你初始化时连接到一个主节点,当嗅探后,不会有任何请求再会进入那个主节点,而是其他任意一个数据节点。这样做的原因是避免搜索流量发送给主节点。
为了启用嗅探,设置client.transport.sniff
为true
:
Settings settings = Settings.builder()
.put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);
其他transport client设置如下:
| 参数 | 描述 |
| ------- | ----- | ---- |
| client.transport.ignore_cluster_name | 当设置为true
时忽略对节点集群名称的验证(0.19.4及以后支持) |
|client.transport.ping_timeout | 等待从一个节点返回ping响应的时间,默认是5秒 |
|client.transport.nodes_sampler_interval | 采样节点列表并连接的间隔,默认是5秒 |
三、文档APIs
3.1 索引API
索引API允许你将一个JSON格式的文档添加到特定的索引中,并使它可以被搜索到。
生成JSON文档
这里有几个不同的方式来生产JSON文档:
- 人工的拼接成
String
或者使用byte[]
- 使用一个
Map
,它将会自动转换成相等的JSON - 使用第三方的类库来序列化你的对象,例如Jackson
- 使用内置的辅助工具
XContentFactory.jsonBuilder()
在内部,没种类型的结果都会转换成byte[]
。如果结果已经是byte[]
形式的话,那么会直接使用它。jsonBuilder
是高度优化的JSON生成器,会直接构造一个byte[]
。
1)自己拼接
没什么说的,根据各API的格式自己写,注意日期格式问题。
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
2)使用Map
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
3)使用第三方类库
以jackson为例。
import com.fasterxml.jackson.databind.*;
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
4)使用Elasticsearch辅助工具
import static org.elasticsearch.common.xcontent.XContentFactory.*;
XContentBuilder builder = jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
添加文档到索引
下面的例子将一个JSON文档添加到名为twitter,类型为tweet的索引中,其id为1。
import static org.elasticsearch.common.xcontent.XContentFactory.*;
IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
.get();
另外一种方式,注意没有指定id。
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse response = client.prepareIndex("twitter", "tweet")
.setSource(json)
.get();
IndexResponse
对象将会给你一个报告。
// 索引名称
String _index = response.getIndex();
// 类型名称
String _type = response.getType();
// 文档ID
String _id = response.getId();
// 版本 (如果你是第一次添加这个文档你将会得到:1)
long _version = response.getVersion();
// 当前实例的状态
RestStatus status = response.status();
线程化操作
这个将文档添加到索引的API允许你将操作放在另一个线程中执行(默认的),你可以通过修改operationThreaded
的设置为false
来关闭它。
3.2 获取文档API
这个API允许你根据文档的ID获取一个JSON类型的文档。下面的例子展示的是从twitter索引的tweet类型下获得ID为1的文档。
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
String json = response.getSourceAsString();
与添加文档到索引的API类似,它默认是在另一个线程中执行获取文档操作的,下面的例子可以关闭它。
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
3.3 删除文档API
与获取API很类似,这个API允许你根据文档的ID删除一个JSON类型的文档。下面的例子展示的是从twitter索引的tweet类型下删除ID为1的文档。
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
它默认也是在另一个线程中执行删除文档操作的,下面的例子可以关闭它。
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
3.4 根据查询条件删除文档API
这个API可以根据查询的结果集来批量删除文档。
BulkIndexByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) // 查询
.source("persons") // 索引
.get(); // 执行操作
long deleted = response.getDeleted(); // 被删除的文档数量
考虑到它可能是一个耗时很长的操作,如果你想异步的进行此操作参看下面的例子:
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) // 查询
.source("persons") // 索引
.execute(new ActionListener<BulkIndexByScrollResponse>() { // 监听器
@Override
public void onResponse(BulkIndexByScrollResponse response) {
long deleted = response.getDeleted(); // 被删除的文档数量
}
@Override
public void onFailure(Exception e) {
// Handle the exception
}
});
3.5 更新文档API
1)使用UpdateRequest
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
2)使用prepareUpdate()
这种方式又有两个不同的用法。
client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script("ctx._source.gender = \"male\"" , ScriptService.ScriptType.INLINE, null, null))
.get();
client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.get();
注意,你不能同时提供script 和 doc
3)upsert
如果待更新文档还不存在,那么会使用upsert元素来创建一个新文档。
IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest);
client.update(updateRequest).get();
3.6 批量获得文档API
你可以根据index, type 和 id来获得多个文档。
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1")
.add("twitter", "tweet", "2", "3", "4")
.add("another", "type", "foo")
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String json = response.getSourceAsString();
}
}
3.7 bulk API
bulk API允许你在单个请求里添加或者删除多个文档。下面是一个示例用法:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
3.8 使用Bulk处理器
BulkProcessor
类提供了一个简单的接口来自动的刷新批量操作,它基于请求的数量或者请求的大小或者手动指定一个范围。
为了使用它,首先需要创建一个BulkProcessor
实例。
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, // 添加elasticsearch client
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } // 当批处理失败并且抛出一个异常时
})
.setBulkActions(10000) // 每10000个请求作为一批处理
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每5mb写入一批数据
.setFlushInterval(TimeValue.timeValueSeconds(5)) // 每5秒写入一批,不管请求的数量有多少
.setConcurrentRequests(1) // 请求并发的数量 0表示同时只允许1个请求执行
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) // 回退策略
.build();
其中回退策略初始时会等待100ms,并且指数级增长,重试3次。要想关闭回退需要设置BackoffPolicy.noBackoff()
。
一些BulkProcessor
默认参数:
- bulkActions 1000
- bulkSize 5mb
- 没有flushInterval
- concurrentRequests 1
- backoffPolicy 等待50ms,重试8次,大致最多等待5.1秒
实例完BulkProcessor
就可以添加请求:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
使用完了后要关闭BulkProcessor
:
// 10分钟后关闭
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
// 立即关闭
bulkProcessor.close();
如果在10分钟内所有的请求执行完毕,awaitClose
方法返回true
,否则返回false
。这两个方法都会将剩下的文档写入,如果设置了flushInterval
则会禁用其他的计划写入。
测试时使用Bulk Processor
如果在测试时你想使用BulkProcessor
填充你的数据集你最好将concurrentRequests
设置为0:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// 添加你的请求
bulkProcessor.add(/* Your requests */);
// 写入剩余的请求
bulkProcessor.flush();
// 关闭
bulkProcessor.close();
// 刷新你的索引
client.admin().indices().prepareRefresh().get();
// 现在你可以开始搜索
client.prepareSearch().get();
四、Query DSL
Elasticsearch提供了一个基于JSON的Query DSL(domain specific languages)来定义查询。它由两种类型的从句组成:
1)叶子查询从句
叶子查询从句在一个指定的域里寻找指定的值。例如match, term 或者 range查询。这些查询可以单独使用。
2)复合查询从句
复合查询从句包装了其他叶子查询子句或者复合查询从句,被用在一个逻辑范式里联合多条件查询(例如:bool
或者 dis_max
查询),或者改变它们的行为(例如:constant_score
查询)。
查询构建器的工厂类是QueryBuilders
,一旦你的查询准备好后,你就可以使用搜索API了。
要想使用QueryBuilders
,你只需要将它们导入进你的类即可:
import static org.elasticsearch.index.query.QueryBuilders.*;
你可以使用QueryBuilder
对象的toString()
方法轻松的将生成的JSON查询条件打印出来。
QueryBuilder
可以用在任何接受查询条件的API里,例如count
和search
。
4.1 Match All Query
最简单的查询,匹配所有的文档,并将_score
的值设置为1.0。
QueryBuilder qb = matchAllQuery();
4.2 全文查询
高层次的全文查询通常被用来在文本域里执行全文查询并返回相关性最强的结果,比如一封电子邮件的正文里。Elasticsearch中的相关性概念非常重要,也是完全区别于传统关系型数据库的一个概念,数据库中的一条记录要么匹配要么不匹配。
这个类别下的查询有:
match query
这是执行全文查询的标准方式, 包括fuzzy matching(模糊匹配),短语或者近似查询。
multi_match query
多字段版本匹配查询
common_terms query
一个更专业化的查询,更适合一些不寻常的词语。它是stopwords的一个现代化替代。
query_string query
支持Lucene查询字符串语法协议,允许你指定AND|OR|NOT
条件和在一个单独的查询字符串里进行多字段搜索。只建议高级用户使用。
simple_query_string
一个更简单的,更稳定的query_string版本
Match Query
QueryBuilder qb = matchQuery(
"name", // 文档域的名称
"kimchy elasticsearch" // 要搜索的文本
);
Multi Match Query
QueryBuilder qb = multiMatchQuery(
"kimchy elasticsearch", // 要搜索的文本
"user", "message" // 文档域的名称
);
Common Terms Query
QueryBuilder qb = commonTermsQuery("name", // 文档域的名称
"kimchy"); // 值
Query String Query
QueryBuilder qb = queryStringQuery("+kimchy -elasticsearch"); // 要搜索的文本
Simple Query String Query
QueryBuilder qb = simpleQueryStringQuery("+kimchy -elasticsearch"); // 要搜索的文本
4.3 术语级别查询(Term level queries)
全文查询会在执行前分析查询字符串,术语级别的查询会在索引中精确匹配要查询的词语。
这些查询通常被用在结构化的数据上,比如数字、日期和一些字典表类的数据,而不是一堆很长的文本。另外,它们还允许你手工处理低等级查询。
这个组里有如下查询:
term query
在指定域里精确的查询包含指定词语的文档。
QueryBuilder qb = termQuery(
"name", // 文档域的名称
"kimchy" // 要搜索的词
);
terms query
在指定域里精确的查询包含任一指定词语的文档。
QueryBuilder qb = termsQuery("tags", // 文档域的名称
"blue", "pill"); // 要搜索的词
range query
查询指定域的值(日期、数字或者字符串)在指定范围内的文档。
QueryBuilder qb = rangeQuery("price") // 文档域的名称
.from(5) // 范围的开始
.to(10) // 范围的结束
.includeLower(true) // 包括范围的开始
.includeUpper(false); // 不包括范围的结束
exists query
查询指定域里有不是null
值的所有文档。
QueryBuilder qb = existsQuery("name"); // 文档域的名称
prefix query
查询所有指定域的值包含特定前缀的文档
QueryBuilder qb = prefixQuery(
"brand", // 文档域的名称
"heine" // 前缀
);
wildcard query
查询所有指定域的值与通配符表达式匹配的文档。支持的通配符有单字符通配符?
和多字符通配符*
QueryBuilder qb = wildcardQuery("user", "k?mc*");
regexp query
查询所有指定域的值与正则表达式匹配的文档。
QueryBuilder qb = regexpQuery(
"name.first", // 文档域的名称
"s.*y"); // 正则表达式
fuzzy query
过时的,针对字符串域,它将会被移除而且没有替代功能
查询所有指定域的值与指定的术语相近的文档。模糊性是由Levenshtein(编辑距离算法)编辑距离1或者2来衡量。
QueryBuilder qb = fuzzyQuery(
"name", // 文档域的名称
"kimzhy" // 搜索文本
);
type query
查询制定类型的所有文档
QueryBuilder qb = typeQuery("my_type");
ids query
查询指定类型与ID的文档
QueryBuilder qb = idsQuery("my_type", "type2")
.addIds("1", "4", "100");
QueryBuilder qb = idsQuery() // 类型是可选的
.addIds("1", "4", "100");
4.4 复合查询(Compound queries)
复合查询包装了其他复合或者叶子查询,用来合并它们的结果和分数,改变它们的行为,或者从查询切换到过滤器上下文。
本组有如下查询:
constant_score query
这个查询包裹了另一个查询,但是在过滤器上下文执行它。所有匹配的文档都会被赋予一个相同的_score
。
QueryBuilder qb = constantScoreQuery(
termQuery("name","kimchy") // 查询语句
)
.boost(2.0f); // 分数
bool query
默认的复合查询,具体有must
, should
, must_not
, 或者filter
从句。must
和should
从句会将它们的分数相加,越多的匹配条件分数越高。
QueryBuilder qb = boolQuery()
.must(termQuery("content", "test1")) // must query
.must(termQuery("content", "test4"))
.mustNot(termQuery("content", "test2")) // must not query
.should(termQuery("content", "test3")) // should query
.filter(termQuery("content", "test5")); // 与一般查询作用一样,只不过不参与评分
dis_max query
这种查询接受多个子查询,并且返回所有子查询的结果。与bool query不同的是,它会使用最匹配子查询的分数。
QueryBuilder qb = disMaxQuery()
.add(termQuery("name", "kimchy"))
.add(termQuery("name", "elasticsearch"))
.boost(1.2f)
.tieBreaker(0.7f);
五、搜索APIs
搜索API允许你执行一个搜索查询,并且取回查询匹配的数据,查询条件在后面的章节介绍。它可以被执行在1个或多个索引和类型上。这里有一个例子:
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders.*;
SearchResponse response = client.prepareSearch("index1", "index2")
.setTypes("type1", "type2")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("multi", "test")) // Query
.setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18)) // Filter
.setFrom(0).setSize(60).setExplain(true) // 分页参数
.get();
注意,所有的参数都是可选的。下面是个条件最少的搜索:
// 使用默认参数匹配整个集群所有的文档
SearchResponse response = client.prepareSearch().get();
尽管Java API定义了附加的searchType:QUERY_AND_FETCH
和 DFS_QUERY_AND_FETCH
,这些模式是内部优化用的,用户不应该在API里使用它们。
实际使用中的常见问题
我是将客户端与Spring集成后使用的。期间遇到了一些问题特此记录下。
1)找不到Log4j 2的相关方法
因为我项目本身就是Log4j 2所以不用做什么配置。用其他日志框架的可以参考这里
但是我第一次启动时提示NoSuchMethodException,后来尝试把Log4j 2的版本升高一些解决这个问题了。原来使用的是2.0.2升级到2.7。
2)failed to get node info for [#transport#-1]
Elasticsearch服务器安装好后运行起来,通过浏览器可以访问,通过HTTP的接口也正常。上网搜索后发现HTTP接口的默认端口号是9200,但是TransportClient
默认的端口号是9300。