ElasticSearch的RestHighLevelClient高阶操作

ElasticSearch的RestHighLevelClient高阶操作

文章目录

1.导入es的高阶客户端

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-high-level-client</artifactId>
  <version>7.4.2</version>
</dependency>

2.给容器中注入RestHighLevelClient

@Configuration
public class ElasticsearchConfig {

    /**
     * 通用的请求设置项, 类似于请求头信息
     */
    public static final RequestOptions COMMON_OPTIONS;

    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();

        COMMON_OPTIONS = builder.build();
    }

    /**
     * 容器注入高阶的rest访问client
     */
    @Bean
    public RestHighLevelClient elasticsearchRestHighLevelClient() {

        //final String hostname, final int port, final String scheme
        //schemeName 默认是 http, 不传也行
        HttpHost httpHost = new HttpHost("192.168.56.10", 9200);
        RestClientBuilder builder = RestClient.builder(httpHost);
        RestHighLevelClient client = new RestHighLevelClient(builder);

        return client;
    }
}

3. 自动注入高阶Client

@Autowired
private RestHighLevelClient client;

4. 测试es中保存数据

@Test
void indexData() throws IOException {
    //1.新建索引
    IndexRequest indexRequest = new IndexRequest("users");

    //2.数据的id
    indexRequest.id("1");

    //第一种保存方式, k-v形式
    //indexRequest.source("username", "张三", "age", 18, "gender", "男");

    //第二种, 保存json字符串对象的形式
    User user = new User();
    user.setUsername("张三");
    user.setAge(18);
    user.setGender("男");
    String jsonString = JSON.toJSONString(user);

    //3.构建数据 XContentType ====> 必须要指定内容类型
    indexRequest.source(jsonString, XContentType.JSON);

    //4.执行操作
    IndexResponse index = client.index(indexRequest, RequestOptions.DEFAULT);

    //提取有用的响应数据
    System.out.println(index);
}

5. 测试批量保存数据

@Test
public void bulkTest() throws IOException {
    //1.创建emp集合
    List<Emp> emps = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        Emp emp = new Emp();
        emp.setId(i);
        emp.setName("张益达" + i + "号");
        emp.setGender(i % 2 == 0 ? "男" : "女");
        emp.setProfession(i % 3 == 0 ? "vue开发工程师" : "安卓开发工程师");
        emps.add(emp);
    }

    //2.bulk请求
    BulkRequest bulkRequest = new BulkRequest();
    for (Emp emp : emps) {
        //3.index请求
        IndexRequest indexRequest = new IndexRequest("emp");
        indexRequest.id(emp.getId().toString());
        String jsonString = JSON.toJSONString(emp);
        indexRequest.source(jsonString, XContentType.JSON);
        //4.将index请求放入bulkRequest中
        bulkRequest.add(indexRequest);
    }

    //5.执行
    BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);

    BulkItemResponse[] items = bulk.getItems();
    List<Object> collect = Arrays.stream(items).map(BulkItemResponse::getResponse).collect(Collectors.toList());
    System.out.println("返回的结果集合:" + collect);
}

@Data
class Emp {
    private Integer id;
    private String name;
    private String gender;
    private String profession;
}

6. 测试检索数据 (简单查询)

@Test
void searchData() throws IOException {

    //1.创建检索请求
    SearchRequest searchRequest = new SearchRequest();
    
    //2.指定索引
    searchRequest.indices("bank");
    
    //3.指定DSL检索条件, 构建查询语句
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("address", "Mill");
    sourceBuilder.query(matchQueryBuilder);
    sourceBuilder.from(0);
    sourceBuilder.size(4);

    //按照年龄的值分布进行聚合
    TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age");
    sourceBuilder.aggregation(ageAgg);
    //计算平均薪资
    AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
    sourceBuilder.aggregation(balanceAvg);

    //4.sourceBuilder放入请求中
    searchRequest.source(sourceBuilder);

    //5.执行检索, 拿到响应
    SearchResponse searchResponse = client.search(searchRequest, ElasticsearchConfig.COMMON_OPTIONS);

    System.out.println("检索条件 " + sourceBuilder.toString());
    System.out.println(searchResponse.toString());

    SearchHits hits = searchResponse.getHits();
    //真正命中的所有记录
    SearchHit[] searchHits = hits.getHits();
    for (SearchHit hit : searchHits) {
        /**
             * "_index": "bank",
             *         "_type": "account",
             *         "_id": "970",
             *         "_score": 5.4032025,
             *         "_source":
             */
        String sourceAsString = hit.getSourceAsString();
        Account account = JSON.parseObject(sourceAsString, Account.class);
        System.out.println(account);
    }

    //获取响应结果的聚合
    Aggregations aggregations = searchResponse.getAggregations();

    Terms ageAggRes = aggregations.get("ageAgg");
    for (Terms.Bucket bucket : ageAggRes.getBuckets()) {
        String keyAsString = bucket.getKeyAsString();
        System.out.println("年龄分布" + keyAsString + "===>有" + bucket.getDocCount() + "人");
    }

    Avg balanceAvgRes = aggregations.get("balanceAvg");
    System.out.println("平均薪资" + balanceAvgRes.getValue());
}

@Data
@ToString
static class Account {
    private int account_number;
    private int balance;
    private String firstname;
    private String lastname;
    private int age;
    private String gender;
    private String address;
    private String employer;
    private String email;
    private String city;
    private String state;
}

7. 先按照年龄聚合, 再计算每个年龄段的人的平均薪资

@Test
public void searchAgeWithBalance() throws IOException {
    //1.创建检索请求
    SearchRequest searchRequest = new SearchRequest();
    //2.指定索引
    searchRequest.indices("bank");
    //3.指定DSL检索条件, 构建查询语句
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(QueryBuilders.matchAllQuery());  //查询所有

    //构造age的聚合
    TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age").size(20);

    
	/**
     * 先按照桶的doc_count排序, doc_count相同, 在根据key排序
     */
    //方法一: 按照count升序排列
    //ageAgg.order(BucketOrder.count(true));
    //按照key降序排列
    //ageAgg.order(BucketOrder.key(false));

    //方法二: 使用BucketOrder.compound方法
    List<BucketOrder> orders = new ArrayList<>();
    orders.add(BucketOrder.count(true));
    orders.add(BucketOrder.key(false));
    //ageAgg.order(orders);
    BucketOrder compound = BucketOrder.compound(orders);
    ageAgg.order(compound);

    //构造年龄的薪资子聚合, 即计算相同年龄段的平均薪资
    AvgAggregationBuilder subBalanceAvg = AggregationBuilders.avg("subBalanceAvg").field("balance");
    ageAgg.subAggregation(subBalanceAvg);
    sourceBuilder.aggregation(ageAgg);

    //总的平均薪资
    AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
    sourceBuilder.aggregation(balanceAvg);

    //4.sourceBuilder放入请求中
    searchRequest.source(sourceBuilder);

    //5.执行检索, 拿到响应
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    System.out.println("检索条件 " + sourceBuilder.toString());
    System.out.println(searchResponse.toString());

    SearchHits hits = searchResponse.getHits();
    for (SearchHit hit : hits.getHits()) {
        String sourceAsString = hit.getSourceAsString();
        Account account = JSON.parseObject(sourceAsString, Account.class);
        System.out.println("account===>" + account);
    }

    System.out.println("---------------------------------");

    Aggregations aggregations = searchResponse.getAggregations();
    ParsedLongTerms ageAggTerms = aggregations.get("ageAgg");

    for (Terms.Bucket bucket : ageAggTerms.getBuckets()) {
        String keyAsString = bucket.getKeyAsString();
        long docCount = bucket.getDocCount();
        Aggregations aggregations1 = bucket.getAggregations();
        ParsedAvg subBalanceAvgRes = aggregations1.get("subBalanceAvg");
        double value = subBalanceAvgRes.getValue();
        System.out.println(keyAsString + "岁年龄人数: " + docCount + " || 平均薪资:" + value);
    }

    ParsedAvg balanceAvgTerms = aggregations.get("balanceAvg");
    double value = balanceAvgTerms.getValue();
    System.out.println("平均薪资" + value);
}

8. 查出所有年龄分布,以及这些年龄段中【男性】的平均薪资和【女性】的平均薪资以及这个年龄段的【总体平均薪资】

@Test
public void searchBalanceGroupByGender() throws IOException {
    //1.检索请求
    SearchRequest searchRequest = new SearchRequest();
    //2.索引
    searchRequest.indices("bank");
    //3.构造检索条件, DSL语句
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(QueryBuilders.matchAllQuery());  //查询所有

    //按照年龄聚合
    TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age").size(20).order(BucketOrder.key(true));
    TermsAggregationBuilder genderAgg = AggregationBuilders.terms("genderAgg").field("gender.keyword").order(BucketOrder.key(true));
    AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");


    genderAgg.subAggregation(balanceAvg);
    ageAgg.subAggregation(genderAgg);

    ageAgg.subAggregation(balanceAvg);
    sourceBuilder.aggregation(ageAgg);

    //4.sourceBuilder放入请求中
    searchRequest.source(sourceBuilder);
    //5.执行请求
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    System.out.println("检索条件 " + sourceBuilder.toString());
    Aggregations aggregations = searchResponse.getAggregations();
    ParsedLongTerms ageAggTerms = aggregations.get("ageAgg");

    for (Terms.Bucket bucket : ageAggTerms.getBuckets()) {
        String keyAsString = bucket.getKeyAsString();
        long docCount = bucket.getDocCount();

        Aggregations bucketAggregations = bucket.getAggregations();
        ParsedStringTerms genderAggterms = bucketAggregations.get("genderAgg");

        for (Terms.Bucket genderAggtermsBucket : genderAggterms.getBuckets()) {
            String genderKey = genderAggtermsBucket.getKeyAsString();
            String gender = genderKey.equals("M") ? "男" : "女";
            long genderDocCount = genderAggtermsBucket.getDocCount();

            Aggregations genderBucketAggregations = genderAggtermsBucket.getAggregations();
            ParsedAvg genderBalanceAvg = genderBucketAggregations.get("balanceAvg");

            System.out.println(keyAsString + "岁的人数中," +gender+ "性有" + genderDocCount + "人,平均薪资是" + genderBalanceAvg.getValue());
        }

        ParsedAvg agebalanceAvg = bucketAggregations.get("balanceAvg");
        System.out.println(keyAsString + "岁的人数有: " + docCount + "  平均薪资:" + agebalanceAvg.getValue() + "\n");
    }
}
上一篇:ES高级API空指针异常


下一篇:Elasticsearch Java 入门教程之搜索常用功能 Demo (四)