如果keyBy时需要按照多个字段分组,该如何写?
1、使用元组
查看keyBy源码,可见当使用元组时可以传入多个索引下标
public KeyedStream<T, Tuple> keyBy(int... fields) { return !(this.getType() instanceof BasicArrayTypeInfo) && !(this.getType() instanceof PrimitiveArrayTypeInfo) ? this.keyBy((Keys)(new ExpressionKeys(fields, this.getType()))) : this.keyBy((KeySelector)KeySelectorUtil.getSelectorForArray(fields, this.getType())); } StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); List<String> list= Arrays.asList(new String[]{"neo,male,1000","tom,female,800","neo,female,300","tom,female,600"}); DataStreamSource<String> streamSource = env.fromCollection(list); streamSource.map(new MapFunction<String, Tuple3<String,String,Integer>>() { @Override public Tuple3<String, String, Integer> map(String line) throws Exception { String[] split = line.split(","); return new Tuple3<String, String, Integer>(split[0],split[1],Integer.valueOf(split[2])); } }).keyBy(0,1).sum(2).print(); env.execute(Test.class.getSimpleName());
运行结果:
3> (neo,male,1000)
2> (neo,female,300)
6> (tom,female,600)
6> (tom,female,1400)
2、使用Bean
注意:使用Bean的时候需要考虑类型(参看上一篇博客:Flink数据类型和序列化,https://blog.csdn.net/zuodaoyong/article/details/103836224)
public KeyedStream<T, Tuple> keyBy(String... fields) { return this.keyBy((Keys)(new ExpressionKeys(fields, this.getType()))); }
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); List<String> list= Arrays.asList(new String[]{"neo,male,1000","tom,female,800","neo,female,300","tom,female,600"}); DataStreamSource<String> streamSource = env.fromCollection(list); streamSource.map(new MapFunction<String, Person>() { @Override public Person map(String line) throws Exception { String[] split = line.split(","); return new Person(split[0],split[1],Integer.valueOf(split[2])); } }).keyBy("name","gender").sum("salary").print(); env.execute(Test.class.getSimpleName());
public class Person { private String name; private String gender; private Integer salary; public Person(String name, String gender, Integer salary) { this.name = name; this.gender = gender; this.salary = salary; } public Person(){} public String getName() { return name; } public void setName(String name) { this.name = name; } public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } public Integer getSalary() { return salary; } public void setSalary(Integer salary) { this.salary = salary; } @Override public String toString() { return "Person{" + "name='" + name + '\'' + ", gender='" + gender + '\'' + ", salary=" + salary + '}'; } }
运行结果:
6> Person{name='tom', gender='female', salary=600}
2> Person{name='neo', gender='female', salary=300}
6> Person{name='tom', gender='female', salary=1400}
3> Person{name='neo', gender='male', salary=1000}