Tablestore:多元索引的统计聚合

前言

信息爆炸的浪潮下,单应用的数据量呈指数级增长,对海量数据进行实时分析的场景日趋广泛。从管理大量设备的监控指标,到勾勒目标用户画像,从突发新闻的舆情监控,到可视化呈现业务规律以供BI决策,都对“实时”、“快速”地分析海量数据提出更高的要求。
表格存储(Tablestore)是阿里云自研的NoSQL多模型数据库,提供海量结构化、半结构化数据存储以及快速的查询和分析能力。除了原生的单点/多点随机查询、范围查询之外,还原生支持对数据进行统计聚合。主要包括:

  • 多元索引提供的各种聚合(Aggregation)和分组(GroupBy)API,支持快速、近实时地全范围分析。
  • GetRange聚合分组,支持Range Key范围内做统计聚合,本文不做展开。

本文对比SQL分析场景,介绍表格存储中的统计聚合功能,并着重介绍多元索引统计聚合的实现。

从SQL开始

标准SQL使用各种聚合函数对数据表中的行和列进行汇总操作。例如,SUM(求和),AVG(平均值),MAX(最大值),MIN(最小值)和 COUNT(行/列计数)等。
使用GROUP BY子句,将表记录进行归类和分组。

场景

下面举一个分析商品信息表的例子,表price_list记录了某电商平台上所有的手机信息。每一行为某店家对某款手机的报价,包含这些字段:商品id (id),商品名字 (name),商品报价(price),手机品牌(brand)和店家(seller)。下面举两个例子,来看看我们如何用SQL对表格数据进行分组和聚合,后文再讨论Tablestore中又是如何实现对应功能的。

准备数据

# 创建表
CREATE TABLE `price_list` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `name` char(20) NOT NULL COMMENT '商品名称',
  `price` decimal(10,2) NOT NULL COMMENT '商品报价',
  `brand` char(20) NOT NULL COMMENT '品牌',
  `seller` int(10) NOT NULL COMMENT '店家',
  PRIMARY KEY (`id`)
) DEFAULT CHARSET=utf8;

# 插入数据
INSERT INTO `price_list` VALUES (NULL, 'vivo Z5x', '1298.00', 'vivo', 1);
INSERT INTO `price_list` VALUES (NULL, 'Huawei P30', '3688.00', 'Huawei', 1);
INSERT INTO `price_list` VALUES (NULL, 'Huawei P30', '3999.00', 'Huawei', 3);
INSERT INTO `price_list` VALUES (NULL, 'Huawei nova 5 Pro', '2999.00', 'Huawei', 1);
INSERT INTO `price_list` VALUES (NULL, 'iPhone XR', '5188.00', 'iPhone', 1);
INSERT INTO `price_list` VALUES (NULL, 'iPhone 8', '4688.00', 'iPhone', 1);
INSERT INTO `price_list` VALUES (NULL, 'iPhone 8', '4699.00', 'iPhone', 2);
INSERT INTO `price_list` VALUES (NULL, 'iPhone 8', '4600.00', 'iPhone', 3);
INSERT INTO `price_list` VALUES (NULL, 'OPPO K1', '1199.00', 'OPPO', 1);
INSERT INTO `price_list` VALUES (NULL, 'Redmi 7', '699.00', 'Xiaomi', 1);

例子1

查询统计所有品牌为"Huawei"的条数,SQL是这样查询的:

SELECT COUNT(id)
FROM price_list
WHERE brand='Huawei';

例子2

将价格大于3000的手机过滤出来,按照"brand"字段进行分组,统计每个分组内的最大值、最小值、平均值,SQL是这样查询的:

SELECT brand, MAX(price), MIN(price), AVG(price)
FROM price_list
GROUP BY brand
WHERE price > 3000.00;

下面看看Tablestore中实现上述分析需求的几种方法。

多元索引的统计聚合

在Tablestore表上建立多元索引,数据以异步方式同步到索引中,为主表提供丰富灵活的查询支持;使用多元索引提供的各种聚合(Aggregation)和分组(GroupBy)查询,可以实时快速分析全表。
一般地,多元索引的同步延迟在秒级别,大部分在10秒以内,主表的全局变化可以很快地反映到多元索引中,因此对索引的统计聚合查询,可以近实时地分析主表中的海量数据。同时,多元索引底层使用倒排索引、列式存储等多种技术,使得每次查询请求能在秒级,甚至毫秒级返回。

请求结构

和Query的关系

多元索引查询请求("search")包含以下并列的三部分——

  • search_query:查询,其内部包含——查询方式(query)、行偏移(offset),返回行数(limit),排序方式(sort)等。
  • aggregation:聚合,统计特定指标,类比SQL中的聚合函数。
  • groupby:分组,对查询结果分组,类比SQL中的group by子句。
"search": {                                //查询请求
    "search_query": {...},    //查询
  "aggregations": {...},    //对查询的结果做聚合
  "groupby": {...}                //对查询的结果做分组
}

查询必须指定search_query,而aggregation和groupby则可选。在发起统计聚合请求时,如果不指定search_query,则默认匹配所有行(MatchAll Query)。本文不展开介绍search_query,接下来详细说明aggregation和groupby的内部构造。

Aggregation(聚合)

对于聚合(Aggregation)请求,aggregations是一个aggregation的数组,数组元素aggregation表示一个特定的聚合实例。
每个aggregation由以下几部分组成:

  • 聚合名字():用户指定,在请求和响应中标识唯一聚合实例。例如,可以命名"max_price"表示一个Max聚合实例;
  • 聚合类型(type):目前支持的类型有Avg, Max, Min, Sum, Count和DistinctCount;
  • 聚合参数(param):每种聚合类型拥有自己特定的参数。例如,Max聚合的必选参数fieldName指定求最大值的索引字段名;可选字段missing为不存在该字段的索引行赋予默认值。
"aggregations": {
  "<aggregation_name>": {"type": ..., "parameter": {...}}
  [, "<aggregation_name2>": {"type": ..., "parameter": {...}}]*
}

GroupBy(分组)

对于分组(Groupby)请求,groupbys是一个groupby的数组,数组元素groupby表示一个特定的分组实例。每个groupby由以下几部分组成:

  • 分组名字():用户指定,在请求和响应中标识唯一分组实例;
  • 分组类型(type):目前支持的分组类型有GroupByField, GroupByRange, GroupByFilter和GroupByGeoDistance;
  • 分组参数(param):每种分组类型拥有自己特定的参数。例如,GroupByField分组的必选参数fieldName指定分组字段名;可选参数size指定最大可以返回多少分组。

与聚合不同,分组中可以包含子聚合或子分组,对父分组的每个结果分组再进行聚合或分组。

  • 子聚合(sub_aggregations):对每个结果分组中的索引行再次聚合。
  • 子分组(sub_groupbys):对每个结果分组中的索引行再次分组。
"groupbys": {
    "<groupby_name>": {
    "type": ...
    , "parameter": {...}
    [, "sub_aggregations": ...]
    [, "sub_groupbys": ...]
  }
}


套用多元索引统计聚合的组织结构,例1中的查询可以抽象为:

//SQL版本:
//SELECT COUNT(id)
//FROM price_list
//WHERE brand='Huawei';

"query": {                                                                                            //多元索引查询中的Query部分
  TermQuery("fieldName": "brand", "value": "Huawei")        //TermQuery查询: 字段"brand"的值为"Huawei"
},
"aggregations": {                                                                                //多元索引查询中的Aggregation部分
    "test_agg": {                                                                                    //Aggregation名称为"test_agg"
      "type": "Count",                                                                        //Aggregation类型为Count Agg
    "parameter": {"fieldName": "id"}                                        //Count Agg统计字段名为"id"的总行数
  }
}

例2中的查询可以抽象为:

//SQL版本:
//SELECT brand, MAX(price), MIN(price), AVG(price)
//FROM price_list
//GROUP BY brand
//WHERE price > 3000.00;

"query": {                                                                                            //多元索引查询中的Query部分
  RangeQuery("fieldName": "price", "greaterThan": 3000.00)    //RangeQuery查询: 字段名"price"的值>3000.00
},
"groupbys": {                                                                                        //多元索引查询中的Groupby部分
    "group_by_brand": {
        "type": "GroupByField",                                                        //Groupby类型为GroupByField
        "parameter": {"fieldName": "brand"},                        //按字段"brand"的不同取值进行分组
        "sub_aggregations": {                                                        //每个分组内的子聚合
            "max_price": {                                                            //统计每个分组内"price"字段的最大值
                "type": "Max",
                "parameter": {"fieldName": "price"}
            },
            "min_price": {                                                            //统计每个分组内"price"字段的最小值
                "type": "Min",
                "parameter": {"fieldName": "price"}
            },
            "avg_price": {                                                            //统计每个分组内"price"字段的平均值
                "type": "Avg",
                "parameter": {"fieldName": "price"}
            }
        }
    }
}

代码实现

上文的两个SQL语句,对应的多元索引实现如下——

例子1

查询统计所有品牌为"Huawei"的条数,多元索引可以这样做:

//SQL版本:
//SELECT COUNT(id)
//FROM price_list
//WHERE brand='Huawei';

public void testQueryAgg() {
    //查询品牌为"Huawei"的手机记录数
    SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
            .tableName(tableName)                               //Tablestore表名称
            .indexName(indexName)                               //多元索引名
            .returnAllColumns(true)                             //返回所有列
            .searchQuery(                                       //查询请求
                    SearchQuery.newBuilder()
                            .query(QueryBuilders.term("brand", "Huawei"))   //TermQuery查询brand为"Huawei"的行
                            .getTotalCount(false)                                               //不返回匹配条数
                            .addAggregation(AggregationBuilders.count("test_agg", "id"))    //Count Aggregation统计"id"字段的个数,Count Agg名称为"test_agg"
                            .build())
            .build();
    
    //发出查询请求,获取查询响应
    SearchResponse resp = syncClient.search(searchRequest);
    System.out.println(resp.getAggregationResults().getAsCountAggregationResult("test_agg").getValue());  //解析test_agg统计结果
}

例子2

将价格大于3000的手机过滤出来,按照"brand"字段进行分组,统计每个分组内的最大值、最小值、平均值,多元索引是这样做的:

//SQL版本:
//SELECT brand, MAX(price), MIN(price), AVG(price)
//FROM price_list
//GROUP BY brand
//WHERE price > 3000.00;

public void testQueryAggGroupBy() {
    //将价格大于3000的手机过滤出来,按照"brand"字段进行分组,统计每个分组内的最大值、最小值、平均值
    SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
            .tableName(tableName)                               //Tablestore表名称
            .indexName(indexName)                               //多元索引名
            .returnAllColumns(false)                            //返回所有列
            .searchQuery(                                       //查询请求
                    SearchQuery.newBuilder()
                            .query(QueryBuilders.range("price").greaterThan(3000.00))   //匹配价格大于3000的记录
                            .getTotalCount(true)                //返回匹配条数
                            .addGroupBy(GroupByBuilders.groupByField("group_by_brand", "brand") //GroupByField按"brand"不同值分组,GroupByField名称为"group_by_brand"
                                    .addSubAggregation(AggregationBuilders.max("max_price", "price"))   //分组内统计"price"字段的最大值,Max Aggregation名称为"max_price"
                                    .addSubAggregation(AggregationBuilders.min("min_price", "price"))   //分组内统计"price"字段的最小值,Min Aggregation名称为"min_price"
                                    .addSubAggregation(AggregationBuilders.avg("avg_price", "price"))   //分组内统计"price"字段的平均值,Avg Aggregation名称为"avg_price"
                            )
                            .build())
            .build();

    //发出查询请求,获取查询响应
    SearchResponse resp = syncClient.search(searchRequest);

    GroupByFieldResult results = resp.getGroupByResults().getAsGroupByFieldResult("group_by_brand");    //获取名字为"group_by_brand"的GroupByField结果
    for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {  //每个分组
        System.out.println("group: " + item.getKey());                          //分组键"brand"的值
        AggregationResults aggregationResults = item.getSubAggregationResults();
        System.out.println("\tmax_price: " + aggregationResults.getAsMaxAggregationResult("max_price").getValue()); //本组内"price"的最大值
        System.out.println("\tmin_price: " + aggregationResults.getAsMinAggregationResult("min_price").getValue()); //本组内"price"的最小值
        System.out.println("\tavg_price: " + aggregationResults.getAsAvgAggregationResult("avg_price").getValue()); //本组内"price"的平均值
    }
}

完整代码在这里:(Java SDK 5.3.0 开始支持多元索引统计聚合,敬请期待)
https://github.com/aliyun/tablestore-examples/tree/master/feature/AggregationAndGroupBy

注意:

  • 支持使用嵌套字段:Aggregation和GroupBy均支持指定“嵌套字段”。
  • 支持GroupBy递归:GroupBy下允许递归Aggregation/GroupBy,Aggregation则不能。

最后,用两组表格来对比SQL统计聚合及其对应的多元索引实现。

Aggregation vs. SQL

类似SQL中的聚合函数,多元索引的Aggregation查询支持结果集的汇总操作。下面表格展示了SQL聚合函数的使用,及其对应的多元索引实现。

类型 多元索引 SQL
最大值 MaxAggregation SELECT MAX(field)
FROM table
最小值 MinAggregation SELECT MIN(field)
FROM table
平均值 AvgAggregation SELECT AVG(field)
FROM table
SumAggregation SELECT SUM(field)
FROM table
(某字段存在的)行数 CountAggregation SELECT COUNT(field)
FROM table
行数 Query中SetGetTotalCount(true) SELECT COUNT(*)
FROM table
基数 DistinctCountAggregation SELECT COUNT(DISTINCT field)
FROM table

DistinctCountAggregation统计误差:
DistinctCountAggregation是一种近似聚合,不会精确计算列的基数(不重复列的数量),在文档数较多时存在一定误差。文档数达到1亿量级时,误差在2%以内。

GroupBy vs. SQL

如同SQL中的GROUP BY子句,多元索引的GroupBy查询会按照指定分组规则,将数据集合进行分组。下面表格展示了SQL分组查询,及其对应的多元索引实现。

类型 多元索引 SQL
字段值分组 GroupByField
(仅支持单字段分组)
SELECT field, COUNT(*)
FROM table
GROUP BY field;
数值范围分组 GroupByRange SELECT '[0, 1000)', count()
FROM table
WHERE field < 1000
UNION
SELECT '[1000, 3000)', count(
)
FROM table
WHERE field >= 1000 AND field < 3000
UNION
...
过滤条件分组 GroupByFilter SELECT count() FROM table WHERE ...
UNION
SELECT count(
) FROM table WHERE ...
UNION
...
地理距离分组 GroupByGeoDistance SET @origin = ST_GeomFromText('POINT(120.079924 30.132177)');  

SELECT *,ST_Distance_Sphere(ST_GeomFromText(CONCAT('POINT(',lng,' ',lat,')')), @origin) as dist
FROM table
WHERE ST_Distance_Sphere(ST_GeomFromText(CONCAT('POINT(',lng,' ',lat,')')), @origin) < 10000
ORDER BY dist
UNION
...

GroupByField统计误差:
由于多元索引采用分布式实现,汇总每个分片结果时,可能会过滤掉某些“均匀分布”但“单分片排名靠后”的结果,导致最终统计结果有微小误差。

GroupByGeoDistance vs. MySQL实现:
在老版本MySQL中,用户必须编写存储过程进行空间坐标计算;MySQL 5.7开始支持GIS特性,可以使用ST_Distance_Sphere计算空间坐标距离。使用表格存储,GroupByGeoDistance原生支持地理坐标距离计算。

最后

还有疑问,扫下面二维码,加入我们的钉钉讨论群:
Tablestore:多元索引的统计聚合

附录:Query、Agg和Groupby的组合 完整代码

public class AggregationAndGroupBy extends BaseExample {
    ...
    
    /** 不指定Query,默认使用MatchAllQuery */
    public void testAgg() {
        //查询最便宜的一款手机
        SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
                .tableName(tableName)                               //Tablestore表名称
                .indexName(indexName)                               //多元索引名
                .returnAllColumns(true)                             //返回所有列
                .searchQuery(                                       //查询请求
                        SearchQuery.newBuilder()
                                .query(QueryBuilders.matchAll())   //匹配所有行
                                .getTotalCount(false)                                               //不返回匹配条数
                                .addAggregation(AggregationBuilders.min("test_agg", "price"))    //Count Aggregation统计"id"字段的个数,Count Agg名称为test_agg""
                                .build())
                .build();

        //发出查询请求,获取查询响应
        SearchResponse resp = syncClient.search(searchRequest);
        System.out.println(resp.getAggregationResults().getAsMinAggregationResult("test_agg").getValue());
    }

    public void testGroupby() {
        //按品牌分组,统计每组匹配个数
        SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
                .tableName(tableName)                               //Tablestore表名称
                .indexName(indexName)                               //多元索引名
                .returnAllColumns(false)                            //返回所有列
                .searchQuery(                                       //查询请求
                        SearchQuery.newBuilder()
                                .getTotalCount(true)                //返回匹配条数
                                .addGroupBy(GroupByBuilders.groupByField("group_by_brand", "brand")) //GroupByField按"brand"不同值分组,GroupByField名称为"group_by_brand"
                                .build())
                .build();

        //发出查询请求,获取查询响应
        SearchResponse resp = syncClient.search(searchRequest);

        GroupByFieldResult results = resp.getGroupByResults().getAsGroupByFieldResult("group_by_brand");    //获取名字为"group_by_brand"的GroupByField结果
        for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {  //每个分组
            System.out.println("group: " + item.getKey());                          //分组键"brand"的值
            System.out.println("size: " + item.getRowCount());                      //本分组内的行数
        }
    }

    public void testGroupbyThenAgg() {
        //按品牌分组,统计每个品牌手机的最小值、最大值、平均值
        SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
                .tableName(tableName)                               //Tablestore表名称
                .indexName(indexName)                               //多元索引名
                .returnAllColumns(false)                            //返回所有列
                .searchQuery(                                       //查询请求
                        SearchQuery.newBuilder()
                                .getTotalCount(true)                //返回匹配条数
                                .addGroupBy(GroupByBuilders.groupByField("group_by_brand", "brand")                //GroupByField按"brand"不同值分组,GroupByField名称为"group_by_brand"
                                        .addSubAggregation(AggregationBuilders.max("max_price", "price"))   //分组内统计"price"字段的最大值,Max Aggregation名称为"max_price"
                                        .addSubAggregation(AggregationBuilders.min("min_price", "price"))   //分组内统计"price"字段的最小值,Min Aggregation名称为"min_price"
                                        .addSubAggregation(AggregationBuilders.avg("avg_price", "price"))   //分组内统计"price"字段的平均值,Avg Aggregation名称为"avg_price"
                                ).build())
                .build();

        //发出查询请求,获取查询响应
        SearchResponse resp = syncClient.search(searchRequest);

        GroupByFieldResult results = resp.getGroupByResults().getAsGroupByFieldResult("group_by_brand");    //获取名字为"group_by_brand"的GroupByField结果
        for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {  //每个分组
            System.out.println("group: " + item.getKey());                          //分组键"brand"的值
            System.out.println("size: " + item.getRowCount());                      //本分组内的行数
        }
    }

    /** 显示指定Query,会先按Query过滤文档,再做后续统计聚合 */

    public void testQueryAgg() {
        //查询品牌为"Huawei"的手机记录数
        SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
                .tableName(tableName)                               //Tablestore表名称
                .indexName(indexName)                               //多元索引名
                .returnAllColumns(true)                             //返回所有列
                .searchQuery(                                       //查询请求
                        SearchQuery.newBuilder()
                                .query(QueryBuilders.term("brand", "Huawei"))   //TermQuery查询brand为"Huawei"的行
                                .getTotalCount(false)                                               //不返回匹配条数
                                .addAggregation(AggregationBuilders.count("test_agg", "id"))    //Count Aggregation统计"id"字段的个数,Count Agg名称为"test_agg"
                                .build())
                .build();

        //发出查询请求,获取查询响应
        SearchResponse resp = syncClient.search(searchRequest);
        System.out.println(resp.getAggregationResults().getAsCountAggregationResult("test_agg").getValue());    //解析test_agg统计结果
    }

    public void testQueryGroupBy() {
        //将价格大于3000的手机过滤出来,按照"brand"字段进行分组,统计每个分组内的记录数
        SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
                .tableName(tableName)                               //Tablestore表名称
                .indexName(indexName)                               //多元索引名
                .returnAllColumns(false)                            //返回所有列
                .searchQuery(                                       //查询请求
                        SearchQuery.newBuilder()
                                .query(QueryBuilders.range("price").greaterThan(3000.00))   //匹配价格大于3000的记录
                                .getTotalCount(true)                //返回匹配条数
                                .addGroupBy(GroupByBuilders.groupByField("group_by_brand", "brand")) //GroupByField按"brand"不同值分组,GroupByField名称为"group_by_brand"
                                .build())
                .build();

        //发出查询请求,获取查询响应
        SearchResponse resp = syncClient.search(searchRequest);

        GroupByFieldResult results = resp.getGroupByResults().getAsGroupByFieldResult("group_by_brand");    //获取名字为"group_by_brand"的GroupByField结果
        for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {  //每个分组
            System.out.println("group: " + item.getKey());                          //分组键"brand"的值
            System.out.println("size: " + item.getRowCount());                      //本分组内的行数
        }
    }

    public void testQueryGroupByThenAgg() {
        //将价格大于3000的手机过滤出来,按照"brand"字段进行分组,统计每个分组内的最大值、最小值、平均值
        SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
                .tableName(tableName)                               //Tablestore表名称
                .indexName(indexName)                               //多元索引名
                .returnAllColumns(false)                            //返回所有列
                .searchQuery(                                       //查询请求
                        SearchQuery.newBuilder()
                                .query(QueryBuilders.range("price").greaterThan(3000.00))   //匹配价格大于3000的记录
                                .getTotalCount(true)                //返回匹配条数
                                .addGroupBy(GroupByBuilders.groupByField("group_by_brand", "brand") //GroupByField按"brand"不同值分组,GroupByField名称为"group_by_brand"
                                        .addSubAggregation(AggregationBuilders.max("max_price", "price"))   //分组内统计"price"字段的最大值,Max Aggregation名称为"max_price"
                                        .addSubAggregation(AggregationBuilders.min("min_price", "price"))   //分组内统计"price"字段的最小值,Min Aggregation名称为"min_price"
                                        .addSubAggregation(AggregationBuilders.avg("avg_price", "price"))   //分组内统计"price"字段的平均值,Avg Aggregation名称为"avg_price"
                                )
                                .build())
                .build();

        //发出查询请求,获取查询响应
        SearchResponse resp = syncClient.search(searchRequest);

        GroupByFieldResult results = resp.getGroupByResults().getAsGroupByFieldResult("group_by_brand");    //获取名字为"group_by_brand"的GroupByField结果
        for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {  //每个分组
            System.out.println("group: " + item.getKey());                          //分组键"brand"的值
            AggregationResults aggregationResults = item.getSubAggregationResults();
            System.out.println("\tmax_price: " + aggregationResults.getAsMaxAggregationResult("max_price").getValue()); //本组内"price"的最大值
            System.out.println("\tmin_price: " + aggregationResults.getAsMinAggregationResult("min_price").getValue()); //本组内"price"的最小值
            System.out.println("\tavg_price: " + aggregationResults.getAsAvgAggregationResult("avg_price").getValue()); //本组内"price"的平均值
        }
    }

    public void testQueryAggAndGroupBy() {
        //将价格大于3000的手机过滤出来,
        // 1. 统计均价
        // 2. 统计每个品牌"brand"的记录行数
        SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
                .tableName(tableName)                               //Tablestore表名称
                .indexName(indexName)                               //多元索引名
                .returnAllColumns(false)                            //返回所有列
                .searchQuery(                                       //查询请求
                        SearchQuery.newBuilder()
                                .query(QueryBuilders.range("price").greaterThan(3000.00))   //匹配价格大于3000的记录
                                .getTotalCount(true)                //返回匹配条数
                                .addAggregation(AggregationBuilders.avg("avg_price", "price"))  //统计所有价格"price" > 3000.00的手机均价
                                .addGroupBy(GroupByBuilders.groupByField("group_by_brand", "brand"))   //GroupByField按"brand"不同值分组,GroupByField名称为"group_by_brand"
                                .build())
                .build();

        //发出查询请求,获取查询响应
        SearchResponse resp = syncClient.search(searchRequest);

        GroupByFieldResult results = resp.getGroupByResults().getAsGroupByFieldResult("group_by_brand");    //获取名字为"group_by_brand"的GroupByField结果
        for (GroupByFieldResultItem item : results.getGroupByFieldResultItems()) {  //每个分组
            System.out.println("group: " + item.getKey());                          //分组键"brand"的值
            System.out.println("size: " + item.getRowCount());                      //分组内行数
        }

        AggregationResult aggregationResult = resp.getAggregationResults().getAsAvgAggregationResult("avg_price");
        System.out.println("avg_price above 3000: " + ((AvgAggregationResult) aggregationResult).getValue());
    }

    /** groupby内部嵌套groupby */

    public void testQueryNestedGroupBy() {
        //将价格大于3000的手机过滤出来,按照"brand"字段进行分组,统计每个分组内的商品数
        SearchRequest searchRequest = SearchRequest.newBuilder()    //多元索引查询请求
                .tableName(tableName)                               //Tablestore表名称
                .indexName(indexName)                               //多元索引名
                .returnAllColumns(false)                            //返回所有列
                .searchQuery(
                        SearchQuery.newBuilder()
                            .getTotalCount(false)                   //不返回匹配条数
                            .addGroupBy(GroupByBuilders.groupByFilter("over_3000")
                                    .addFilter(QueryBuilders.range("price").greaterThan(3000.00))
                                    .addSubGroupBy(GroupByBuilders.groupByField("group_by_brand", "brand"))
                            ).build())
                .build();

        //发出查询请求,获取查询响应
        SearchResponse resp = syncClient.search(searchRequest);

        GroupByFilterResult results = resp.getGroupByResults().getAsGroupByFilterResult("over_3000");    //获取名字为"over_3000"的GroupByFilter结果
        for (GroupByFilterResultItem item : results.getGroupByFilterResultItems()) {  //每个filter分组,当前只有一个分组("price"字段值>3000.00)
            GroupByResults filterResult = item.getSubGroupByResults();                //获取filter分组结果
            GroupByFieldResult brandGroups = filterResult.getAsGroupByFieldResult("group_by_brand");    //获取名字为"group_by_brand"的GroupByField结果
            for (GroupByFieldResultItem brandGroup : brandGroups.getGroupByFieldResultItems()) {    //遍历"group_by_brand"每个分组
                System.out.println("group: " + brandGroup.getKey());        //分组键"brand"的值
                System.out.println("size: " + brandGroup.getRowCount());    //本分组内的行数
            }
        }
    }
    ...
}
上一篇:HEVC新技术(一):基于MVC的AMVP技术


下一篇:HEVC Study Two(基于HM14.0平台)--如何在HEVC中加入背景建模技术(二)