Elasticsearch Java Rest Client简述

ESJavaClient的历史

JavaAPI Client

  • 优势:基于transport进行数据访问,能够使用ES集群内部的性能特性,性能相对好
  • 劣势:client版本需要和es集群版本一致,数据序列化通过java实现,es集群或jdk升级,客户端需要伴随升级。
 
    ES官网最早提供的Client,spring-data-elasticsearch也基于该client开发,使用transport接口进行通信,其工作方式是将webserver当做集群中的节点,获取集群数据节点(DataNode)并将请求路由到Node获取数据将,返回结果在webserver内部进行结果汇总。 client需要与es集群保持相对一致性,否则会出现各种『奇怪』的异常。由于ES集群升级很快,集群升级时客户端伴随升级的成本高。
 
官网已声明es7.0将不在支持transport client(API Client),8.0正时移除
 

REST Client

  • 优势:REST风格交互,符合ES设计初衷;兼容性强;
  • 劣势:性能相对API较低
 
ESREST基于http协议,客户端发送请求到es的任何节点,节点会通过transport接口将请求路由到其他节点,完成数据处理后汇总并返回客户端,客户端负载低,。
 
RestFul是ES特性之一,但是直到5.0才有了自己的客户端,6.0才有的相对好用的客户端。在此之前JestClient作为第三方客户端,使用非常广泛。
 
本文将对Java Rest Client、Java High Level Client、Jest三种Restfull风格的客户端做简单的介绍,个人推荐使用JestClient,详见后文。
 

Java Rest Client

三种客户端中,感觉最『原始』的客户端,感觉在使用HttpClient,请求参数繁多,几乎没有提供Mapping能力,请求结束后需要拿着json一层层解析,对于开发来说大部分情况下不需要关心"took"、"_shards"、"timed_out"这些属性,这些更多依赖集群的稳定性,如何方便快捷的拿到_source中的数据才是首要编码内容。
{
"took": 9,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 0.2876821,
"hits": [
{
"_index": "pangu",
"_type": "normal",
"_id": "2088201805281551",
"_score": 0.2876821,
"_source": {
"age": 8,
"country": "UK",
"id": "2088201805281551",
"name": "baby"
}
},
{
"_index": "pangu",
"_type": "normal",
"_id": "2088201805281552",
"_score": 0.2876821,
"_source": {
"age": 8,
"country": "UK",
"id": "2088201805281552",
"name": "baby"
}
}
]
}
}
 

Demo

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* @author yanlei
* @version $Id: RestClientDemo.java, v 0.1 2018年05月26日 下午12:27 yanlei Exp $
*/
public class RestClientDemo { private static final Logger LOGGER = LoggerFactory.getLogger(RestClientDemo.class); /**
* index 名称
*/
private static final String INDEX_NAME = "pangu"; /**
* type 名称
*/
private static final String TYPE_NAME = "normal"; private static RestClient restClient; static {
restClient = RestClient.builder(new HttpHost("search.alipay.com", 9999, "http"))
.setFailureListener(new RestClient.FailureListener() { // 连接失败策略
@Override
public void onFailure(HttpHost host) {
LOGGER.error("init client error, host:{}", host);
}
})
.setMaxRetryTimeoutMillis(10000) // 超时时间
.setHttpClientConfigCallback(new HttpClientConfigCallback() { // 认证
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("example", "WnhmUwjU"));
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})
.build();
} public boolean index(String id, String jsonString) {
String method = "PUT";
String endpoint = new StringBuilder().append("/").append(INDEX_NAME).append("/").append(TYPE_NAME).append("/").append(id)
.toString();
LOGGER.info("method={}, endpoint={}", method, endpoint); HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
Response response = null;
try {
response = restClient.performRequest(method, endpoint, Collections.emptyMap(), entity);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_CREATED
|| response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
LOGGER.info("index success");
return true;
} else {
LOGGER.error("index error : {}", response.toString());
return false;
}
} catch (IOException e) {
LOGGER.error("system error, id={}, jsonString={}", id, jsonString, e);
return false;
}
} public <T> T search(String searchString, CallbackSearch<T> callbackSearch) {
String method = "GET";
String endpoint = new StringBuilder().append("/").append(INDEX_NAME).append("/").append(TYPE_NAME).append("/").append("_search")
.toString();
LOGGER.info("method={}, endpoint={}", method, endpoint); HttpEntity entity = new NStringEntity(searchString, ContentType.APPLICATION_JSON);
Response response = null;
try {
response = restClient.performRequest(method, endpoint, Collections.emptyMap(), entity);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
// 提取数据
String resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
return callbackSearch.get(resultString);
} else {
LOGGER.error("index false, error : {}", response);
return null;
}
} catch (IOException e) {
LOGGER.error("system error, searchString={}", searchString, e);
return null;
}
} public interface CallbackSearch<T> {
T get(String responseString);
} public static void main(String[] args) {
RestClientDemo restClientDemo = new RestClientDemo(); // 1. 索引数据
User user = new User();
user.setId("2088201805281345");
user.setName("nick");
user.setAge(16);
user.setCountry("USA");
boolean indexOK = restClientDemo.index(user.getId(), JSON.toJSONString(user));
LOGGER.info("index param={} result={}", user, indexOK); // 2. 数据检索
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("name", "nick")))
.sort("id", SortOrder.DESC)
.from(0).size(10); List<User> searchResult = restClientDemo.search(sourceBuilder.toString(), new CallbackSearch<List<User>>() {
@Override
public List<User> get(String responseString) {
LOGGER.info("responseString={}", responseString);
List<User> result = new ArrayList<>();
JSONObject responseObj = JSON.parseObject(responseString);
JSONObject hits = responseObj.getJSONObject("hits");
if (hits.getIntValue("total") != 0) {
JSONArray innerHits = hits.getJSONArray("hits");
for (int i = 0; i < innerHits.size(); i++) {
JSONObject innerhit = innerHits.getJSONObject(i);
User user = innerhit.getObject("_source", User.class);
result.add(user);
}
}
return result;
}
});
LOGGER.info("search param={} result={}", sourceBuilder, searchResult); // 3. 聚合查询
SearchSourceBuilder aggSearchSourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.aggregation(AggregationBuilders.avg("age_avg").field("age")); Double aggResult = restClientDemo.search(aggSearchSourceBuilder.toString(), new CallbackSearch<Double>() { @Override
public Double get(String responseString) {
LOGGER.info("responseString={}", responseString);
JSONObject responseObj = JSON.parseObject(responseString);
JSONObject aggregations = responseObj.getJSONObject("aggregations");
Double result = aggregations.getJSONObject("age_avg").getDouble("value"); return result;
}
}); LOGGER.info("aggregation param={} result={}", aggSearchSourceBuilder, aggResult);
}
}

Java High Level REST Client

5.0的RestClient很难用,官网似乎也发现了这个问题。从6.0开始推出的Java High Level REST Client明显在交互易用性方面提升了很多,提供了诸如getHit等方法获取结果,开发者可以更关注与核心数据的操作
但是Java High Level REST Client似乎犯了和Java API Client的问题,底层设置了很多6.0才有的特性,导致改版本无法用于低版本ES(包含ZSearch,目前ZSearch是5.3版本)。
 
Elasticsearch Java Rest Client简述
 

Demo

import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import com.alibaba.fastjson.JSON; import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* @author yanlei
* @version $Id: RestClientDemo.java, v 0.1 2018年05月26日 下午12:27 yanlei Exp $
*/
public class HighLevelRestClientDemo { private static final Logger LOGGER = LoggerFactory.getLogger(HighLevelRestClientDemo.class); private static final String INDEX_NAME = "pangu"; private static final String TYPE_NAME = "normal"; private static RestHighLevelClient client; static {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("example", "WnhmUwjU")); RestClientBuilder builder = RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http"))
.setFailureListener(new RestClient.FailureListener() { // 连接失败策略
@Override
public void onFailure(HttpHost host) {
LOGGER.error("init client error, host:{}", host);
}
})
.setMaxRetryTimeoutMillis(10000) // 超时时间
.setHttpClientConfigCallback(new HttpClientConfigCallback() { // 认证
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}); client = new RestHighLevelClient(builder);
} public boolean index(String id, String jsonString) {
IndexRequest request = new IndexRequest(
INDEX_NAME,
TYPE_NAME,
id);
request.source(jsonString, XContentType.JSON); IndexResponse response = null;
try {
response = client.index(request);
if(response.status().getStatus() == HttpStatus.SC_CREATED || response.status().getStatus() == HttpStatus.SC_OK) {
LOGGER.info("index success");
return true;
}else {
LOGGER.error("index error : {}", response.toString());
return false;
}
} catch (IOException e) {
LOGGER.error("系统异常", e);
return false;
}
} public <T> T search(SearchSourceBuilder sourceBuilder, CallbackSearch<T> callbackSearch) {
SearchRequest searchRequest = new SearchRequest()
.indices(INDEX_NAME)
.types(TYPE_NAME)
.source(sourceBuilder); SearchResponse response = null;
try {
response = client.search(searchRequest);
if(response.status().getStatus() == HttpStatus.SC_OK) {
// 提取数据
return callbackSearch.get(response);
}else {
LOGGER.error("index false, error : {}", response);
return null;
}
} catch (IOException e) {
LOGGER.error("系统异常", e);
return null;
}
} public interface CallbackSearch<T> {
T get(SearchResponse response);
} public static void main(String[] args) {
HighLevelRestClientDemo restClientDemo = new HighLevelRestClientDemo(); // 1. 索引数据
User user = new User();
user.setId("2088201805281552");
user.setName("baby");
user.setAge(8);
user.setCountry("UK");
boolean indexOK = restClientDemo.index(user.getId(), JSON.toJSONString(user));
LOGGER.info("index param={} result={}", user, indexOK); // 2. 数据检索
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("name", "baby")))
//.sort("age", SortOrder.DESC)
.from(0).size(10); List<User> searchResult = restClientDemo.search(sourceBuilder, new CallbackSearch<List<User>>() {
@Override
public List<User> get(SearchResponse response) {
List<User> result = new ArrayList<>();
response.getHits().forEach(hit -> {
User user = JSON.parseObject(hit.getSourceAsString(), User.class);
result.add(user);
return result;
}
});
LOGGER.info("search param={} result={}", sourceBuilder, searchResult); }
}

JestClient

伴随1.*ES诞生,底层基于HttpClient+GSON提供集群访问与数据映射,Java High Level REST Client的很多设计也借鉴了Jest。
Jest在ES操作易用性方面与Java High Level REST Client部分伯仲,但是其多版本兼容性比后者强很多。虽然使用有龟速之称的GSON切不能替换,但是其性能应该能满足大部分业务场景。
在18台服务器(4核+64G内存)13亿数据的数据上做性能测试,QPS达到200+时,集群出现异常,所以GSON不是首要考虑的因素。PS:200+QPS在集群没有做冷热数据切分与rounting下的效果。
 

DEMO

import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Index;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.core.SearchResult.Hit;
import io.searchbox.core.search.aggregation.MetricAggregation;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* @author yanlei
* @version $Id: JestClientDemo.java, v 0.1 2018年05月23日 下午5:45 yanlei Exp $
*/
public class JestClientDemo { private static final Logger LOGGER = LoggerFactory.getLogger(JestClientDemo.class); private static final String INDEX_NAME = "pangu"; private static final String TYPE_NAME = "normal"; private static JestClient client; static {
JestClientFactory factory = new JestClientFactory(); // 定制化gson配置
Gson gson = new GsonBuilder().setDateFormat("yyyyMMdd hh:mm:ss").create(); factory.setHttpClientConfig(new HttpClientConfig
.Builder("http://search.alipay.com:9999") // 集群地址
.defaultCredentials("example", "WnhmUwjU") // 认证信息
.gson(gson) // 启用定制化gson,可使用默认
.multiThreaded(true)
.defaultMaxTotalConnectionPerRoute(2)
.maxTotalConnection(10)
.build()); client = factory.getObject();
} public boolean index(String id, Object obj) {
Index index = new Index.Builder(obj).index(INDEX_NAME).type(TYPE_NAME).id(id).build();
try {
JestResult result = client.execute(index);
if(result.isSucceeded()) {
LOGGER.info("index success");
return true;
}else {
LOGGER.error("index error : {}", result.getErrorMessage());
return false;
}
} catch (IOException e) {
LOGGER.error("系统异常", e);
return false;
}
} public boolean index(User user) {
Index index = new Index.Builder(user).index(INDEX_NAME).type(TYPE_NAME).build();
try {
JestResult result = client.execute(index);
if(result.isSucceeded()) {
LOGGER.info("index success");
return true;
}else {
LOGGER.error("index error : {}", result.getErrorMessage());
return false;
}
} catch (IOException e) {
LOGGER.error("系统异常", e);
return false;
}
} public <T> List<T> search(SearchSourceBuilder sourceBuilder, CallbackSearch<T> callbackSearch, Class<T> response) {
Search search = new Search.Builder(sourceBuilder.toString())
.addIndex(INDEX_NAME)
.addType(TYPE_NAME)
.build(); SearchResult result = null;
try {
result = client.execute(search);
if(result.isSucceeded()) {
// 提取数据
List<Hit<T, Void>> hits = result.getHits(response);
return callbackSearch.getHits(hits);
}else {
LOGGER.error("index false, error : {}", result.getErrorMessage());
return null;
}
} catch (IOException e) {
LOGGER.error("系统异常", e);
return null;
}
} public <T> T aggregation(SearchSourceBuilder sourceBuilder, CallbackAggregation<T> callbackAggregation) {
Search search = new Search.Builder(sourceBuilder.toString())
.addIndex(INDEX_NAME)
.addType(TYPE_NAME)
.build(); SearchResult result = null;
try {
result = client.execute(search);
if(result.isSucceeded()) {
return callbackAggregation.getAgg(result.getAggregations());
}else {
LOGGER.error("index false, error : {}", result.getErrorMessage());
return null;
}
} catch (IOException e) {
LOGGER.error("系统异常", e);
return null;
}
} public interface CallbackSearch<T> {
List<T> getHits(List<Hit<T, Void>> hits);
} public interface CallbackAggregation<T> {
T getAgg(MetricAggregation metricAggregation);
} public static void main(String[] args) {
JestClientDemo demo = new JestClientDemo(); // 1. 索引数据
User user = new User();
user.setId("2088201805281205");
user.setName("nick");
user.setAge(18);
user.setCountry("CHINA");
boolean indexOK = demo.index(user);
LOGGER.info("index param={} result={}", user, indexOK); // 2. 数据检索
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("name", "nick")))
.sort("id", SortOrder.DESC)
.from(0).size(10);
List<User> searchResult = demo.search(sourceBuilder, new CallbackSearch<User>() {
@Override
public List<User> getHits(List<Hit<User, Void>> hits) {
List<User> userList = new ArrayList<>();
hits.forEach(hit -> {
userList.add(hit.source);
});
return userList;
}
}, User.class);
LOGGER.info("search param={} result={}", sourceBuilder, searchResult); // 3. 聚合查询
SearchSourceBuilder aggSearchSourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
.aggregation(AggregationBuilders.avg("age_avg").field("age"));
double aggResult = demo.aggregation(aggSearchSourceBuilder, metricAggregation -> {
double avgAge = metricAggregation.getAvgAggregation("age_avg").getAvg();
return avgAge;
});
LOGGER.info("aggregation param={} result={}", aggSearchSourceBuilder, aggResult);
}
}

小结

JestClient兼容性由于其他两者,Java High Level REST Client设置了默认调优参数,若版本匹配,其性能会更加优秀。Java Rest Cilent随Java High Level REST Client推出,更名为Java Low Level REST Client了,官网也不推荐使用了。
最后补充下,RestClient虽然基于ResulFul风格的请求,但是构建QueryDSL还是一件成本比较高的事情,所以Java High Level REST Client和Jest都使用ElasticSearch的APIclient构建QueryDSL,其中的Builder模式在在两个客户端中都有比较好的实现,值得学习研究。
上一篇:使用Jedis操作Redis-使用Java语言在客户端操作---set类型


下一篇:Flask 框架中 上下文基础理念,包括cookie,session存储方法,requset属性,current_app模块和g模块