完整报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
at GroupByWindowAggregation.main(GroupByWindowAggregation.java:44)
牢记概念:
Flink的Table必定有Schema
调试手段
代码中加入:
System.out.println(orders.getSchema());
root
|-- user: BIGINT
|-- product: STRING
|-- amount: INT
|-- rowtime: TIMESTAMP(3) *ROWTIME*
解决方案
Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());
对应的OrderStream是:
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
/*
* Simple POJO.
*/
import java.sql.Timestamp;
import org.apache.flink.streaming.api.windowing.time.Time;
public class OrderStream
{
public Long user;
public String product;
public int amount;
public Long rowtime;
public OrderStream()
{
}
public OrderStream(Long user, String product, int amount,Long rowtime)
{
this.user = user;
this.product = product;
this.amount = amount;
this.rowtime=rowtime;
}
@Override
public String toString() {
return "Order{" +
"user=" + user +
", product='" + product + '\'' +
", amount ='" + amount + '\'' +
", rowtime=" + rowtime +
'}';
}
}
对应的主程序为:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00
new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00
new OrderStream(1L, "diaper", 4,1505528400L),//2017-09-16 10:20:00
new OrderStream(1L, "diaper", 4,1505528400L)//2017-09-16 10:20:00
));
Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());
System.out.println(orders.getSchema());