ES聚合分析
1.metric(指标)聚合
1.1 单值分析
-
min 求指定字段的最小值
# 求价格的最小值 { "size":0, "aggs":{ "min_price":{ "min":{ "field":"price" } } } }
-
max 求指定字段的最大值
# 求价格的最大值 { "size":0, "aggs":{ "max_price":{ "max":{ "field":"price" } } } }
-
avg 求指定字段的平均值
# 求价格的平均值 { "size":0, "aggs":{ "avg_price":{ "avg":{ "field":"price" } } } }
-
sum 求指定字段的总和
# 求价格的总和 { "size":0, "aggs":{ "sum_price":{ "sum":{ "field":"price" } } } }
-
value_count 统计某字段有值的文档数
# 求price有值的个数 { "aggs":{ "price_count":{ "value_count":{ "field":"price" } } } }
-
cardinality 去重计数
#查询有多少种job(不分词下进行去重) { "aggs":{ "job_count":{ "cardinality":{ "field":"job.keyword" } } } }
1.2 多值分析
-
stats 包含多种返回结果:min,max,avg,sum,count
# 可以分析查询出以上的所有关于价格的单值结果 { "aggs":{ "price_stats":{ "stats":{ "field":"price" } } } }
-
Extended stats 高级统计,比stats查询多四个结果:平方和、方差、标准差、平均值加/减两个标准差的区间
{ "aggs":{ "extended_salary":{ "extended_stats":{ "field":"salary" } } } }
-
Percentiles:占比百分位数统计
# 统计salay每个值所在的百分比 { "aggs":{ "percentiles_salary":{ "percentiles":{ "field":"salary" } } } } # 统计指定分位值的数据是多少 { "aggs":{ "percentiles_salary":{ "percentiles":{ "field":"salary", "percents" : [75, 99, 99.9] } } } }
-
Percentiles rank 统计值小于等于指定值的文档占比
{ "aggs":{ "gge_perc_rank":{ "percentile_ranks":{ "field":"price", "values":[ 100, 200 ] } } } }
-
top_hits:用于分桶后获取该桶内最匹配的顶部文档列表,即详情数据
{ "aggs":{ "jobs":{ "terms":{ "field":"job.keyword", "size":10 }, "aggs":{ "top_jobs":{ "top_hits":{ "size":10, "sort":[ { "age":{ "order":"desc" } } ] } } } } } }
2.bucket(桶)聚合
2.1 Bucket的分桶策略
-
Terms:按照指定字段进行分桶
{ "size":0, "aggs":{ "terms_jobs":{ "terms":{ "field":"job.keyword", "size":5 } } } }
-
Range:按照指定字段的值的范围进行分桶
{ "size":0, "aggs":{ "group_by_price":{ "range":{ "field":"price", "ranges":[ { "key":"<200", "to":200 }, { "from":200, "to":400 }, { "key":">400", "from":400 } ] } } } }
-
Date_Range:按照日期字段的日期范围进行分桶
{ "size":0, "aggs":{ "group_by_birth":{ "date_range":{ "field":"birth", "format":"yyyy", "ranges":[ { "key":"2000年以前", "to":"2000" }, { "key":"2000年 - 2020年", "from":"2000", "to":"2020" }, { "key":"2020年以后", "from":"2020" } ] } } } }
-
Histogram:直方图,以固定间隔的策略来分割数据
{ "size":0, "aggs":{ "salary_hist":{ "histogram":{ "field":"salary", "interval":5000, "extended_bounds":{ "min":0, "max":40000 } } } } }
-
Date_Histogram:针对日期的直方图或柱状图,时序数据分析常用的。
{ "size":0, "aggs":{ "by_year":{ "date_histogram":{ "field":"birth", "interval":"year", "format":"yyyy" } } } }
2.2 Bucket + Metric
-
案例一:按照不同年龄段分桶,求每个年龄段的工资平均值
{ "size":0, "aggs":{ "group_by_age":{ "range":{ "field":"age", "ranges":[ { "key":"<20", "to":20 }, { "key":"20 - 50", "from":20, "to":50 }, { "key":">50", "from":50 } ] }, "aggs":{ "avg_salary":{ "avg":{ "field":"salary" } } } } } }
-
案例二:分桶再分桶:先根据job分桶,再按照不同年龄划分
{ "size":0, "aggs":{ "group_by_job":{ "terms":{ "field":"job", "size":10 }, "aggs":{ "range_age":{ "range":{ "field":"age", "ranges":[ { "key":"<20", "to":20 }, { "key":"20 - 50", "from":20, "to":50 }, { "key":">50", "from":50 } ] } } } } } }
-
案例三:分桶后进行数据分析
# 求出不同种工作的平均薪资 { "size":0, "aggs":{ "group_by_job":{ "terms":{ "field":"job", "size":10 }, "aggs":{ "avg_salary":{ "stats":{ "field":"salary" } } } } } }
3.pipeline(管道)聚合
3.1 Parent结果内嵌到现有的聚合分析结果中
- Derivative(导数)
- Moving Average(移动平均)
- Cumulative Sum(累计求和)
案例一:根据生日按月分组,求出每组的平均值,以及导数
{
"size":0,
"aggs":{
"group_by_birth":{
"date_histogram":{
"field":"birth",
"interval":"mounth",
"format":"yyyy"
},
"aggs":{
"avg_salary":{
"avg":{
"field":"salary"
}
},
"derivative_avg_salary":{
"derivative":{
"buckets_path":"avg_salary"
}
}
}
}
}
}
3.2 Sibing结果与现有聚合分析结果同级
- Max/Min/Avg/Sum Bucket
- Stats/Extended Stats Bucket
- Percentiles Bucket
案例一:根据job进行分组,求出每组的平均工资,找出这些组平均工资的最小值
{
"size":0,
"aggs":{
"group_by_job":{
"terms":{
"field":"job",
"size":10
},
"aggs":{
"avg_salary":{
"avg":{
"field":"salary"
}
}
}
},
"min_salary_by_job":{
"min_bucket":{
"buckets_path":"group_by_job>avg_salary"
}
}
}
}
4.ES中的Java API
4.1 terms,range,date_range等聚合的演示
-
ES相关依赖
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.3.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>7.3.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.3.2</version> </dependency> <!-- Java High Level REST Client --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.3.2</version> </dependency>
-
ES相关聚合部分的演示代码
package com.lenovo.btit.elasticsearch; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.Range; import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.Avg; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Es聚合分析使用 * * @author: zangchuanlei * @date: 2021/7/11 * @time: 15:18 */ public class EsServiceBucket { private RestHighLevelClient highLevelClient; // terms某个字段进行分组标识 private final String BUCKET_TERMS = "1"; // range按照某个范围进行分组(double,int) private final String BUCKET_RANGE = "2"; // date_range按照时间范围进行分组 private final String BUCKET_DATA_RANGE = "3"; /** * @param bucketType 桶聚合类型 * @param resultName 结果集名称 * @param size 每页几个数据 * @param indices 文档库名 * @description 执行ES聚合分析查询 * @author zangchuanlei * @date 2021/7/11 16:15 */ private Map<String, Long> query(String bucketType, String resultName, int size, String... indices) { // 获取搜索构建器 SearchSourceBuilder sourceBuilder; switch (bucketType) { case BUCKET_TERMS: sourceBuilder = getSourceBuilderOfTerms(resultName, size); break; case BUCKET_RANGE: sourceBuilder = getSourceBuilderOfRange(resultName, size); break; case BUCKET_DATA_RANGE: sourceBuilder = getSourceBuilderOfDataRange(resultName, size); break; default: sourceBuilder = new SearchSourceBuilder(); break; } //建立关于指定文档库的搜索请求 SearchRequest searchRequest = new SearchRequest() .indices(indices) .source(sourceBuilder); //返回处理结果 return getResultMapByBucket(bucketType,searchRequest, resultName); } /** * @param searchRequest 搜索请求 * @param resultName 结果集名称 * @description 处理搜索响应数据 * @author zangchuanlei * @date 2021/7/11 16:41 */ private Map<String, Long> getResultMapByBucket(String bucketType, SearchRequest searchRequest, String resultName) { // 创建返回结果map容器 Map<String, Long> rtnMap = new HashMap<>(); try { SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); // 字段分组 if (StringUtils.equals(BUCKET_TERMS, bucketType)) { // 根据结果集名称获取Terms Terms aggsTerm = searchResponse.getAggregations().get(resultName); //获得分组后的桶信息 List<? extends Terms.Bucket> termBuckets = aggsTerm.getBuckets(); termBuckets.forEach(b -> { rtnMap.put(b.getKeyAsString(), b.getDocCount()); System.out.println("key:" + b.getKeyAsString()); System.out.println("count:" + b.getDocCount()); // 处理子聚合,打印每组的平均价格 Avg averageBalance = b.getAggregations().get("price_avg"); System.out.println("key = "+b.getKeyAsString()+"的价格平均值为:"+averageBalance.getValue()); }); // 范围分组 } else if (StringUtils.equals(BUCKET_RANGE, bucketType) || StringUtils.equals(BUCKET_DATA_RANGE, bucketType)) { Range aggsRange = searchResponse.getAggregations().get(resultName); List<? extends Range.Bucket> rangeBuckets = aggsRange.getBuckets(); rangeBuckets.forEach(b->{ rtnMap.put(b.getKeyAsString(), b.getDocCount()); }); } } catch (IOException e) { e.printStackTrace(); } return rtnMap; } /** * @param resultName 结果集名称 * @param size 显示个数 * @description 构建根据商品品牌分组,并计算每组的价格平均值 * @author zangchuanlei * @date 2021/7/11 16:24 */ private SearchSourceBuilder getSourceBuilderOfTerms(String resultName, int size) { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // step1:先根据品牌进行分组 TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(resultName) .field("bank.keyword"); // step2:计算每个品牌的的平均价格 aggregationBuilder.subAggregation(AggregationBuilders.avg("price_avg").field("price")); sourceBuilder.aggregation(aggregationBuilder); sourceBuilder.size(size).trackTotalHits(true); return sourceBuilder; } /** * @param resultName 结果集名称 * @param size 显示个数 * @description 构建根据商品价格范围分组的聚合查询条件 * @author zangchuanlei * @date 2021/7/11 16:52 */ private SearchSourceBuilder getSourceBuilderOfRange(String resultName, int size) { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); RangeAggregationBuilder aggregationBuilder = AggregationBuilders.range(resultName) .field("price") // 分组范围:小于1000,1000-3000,大于3000 .addUnboundedTo(1000).addRange(1000, 3000).addUnboundedFrom(3000); sourceBuilder.aggregation(aggregationBuilder); sourceBuilder.size(size).trackTotalHits(true); return sourceBuilder; } /** * @param resultName * @param size * @description 构建根据上市时间范围分组的聚合查询条件 * @author zangchuanlei * @date 2021/7/11 17:05 */ private SearchSourceBuilder getSourceBuilderOfDataRange(String resultName, int size) { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 按照上市时间进行分组 DateRangeAggregationBuilder aggregationBuilder = AggregationBuilders.dateRange(resultName).format("yyyy-MM-dd") .field("launch_date") .addUnboundedTo("2000-01-01以前", "2000-01-01") .addRange("2000-01-01至2010-12-31", "2000-01-01", "2010-12-31") .addUnboundedFrom("2010-12-31以后", "2010-12-31"); sourceBuilder.aggregation(aggregationBuilder); sourceBuilder.size(size).trackTotalHits(true); return sourceBuilder; } }