keyBy多字段分组

如果keyBy时需要按照多个字段分组,该如何写?

1、使用元组

查看keyBy源码,可见当使用元组时可以传入多个索引下标

public KeyedStream<T, Tuple> keyBy(int... fields) {
    return !(this.getType() instanceof BasicArrayTypeInfo) && !(this.getType() instanceof PrimitiveArrayTypeInfo) ? this.keyBy((Keys)(new ExpressionKeys(fields, this.getType()))) : this.keyBy((KeySelector)KeySelectorUtil.getSelectorForArray(fields, this.getType()));
}

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
List<String> list= Arrays.asList(new String[]{"neo,male,1000","tom,female,800","neo,female,300","tom,female,600"});
DataStreamSource<String> streamSource = env.fromCollection(list);
streamSource.map(new MapFunction<String, Tuple3<String,String,Integer>>() {
    @Override
    public Tuple3<String, String, Integer> map(String line) throws Exception {
        String[] split = line.split(",");
        return new Tuple3<String, String, Integer>(split[0],split[1],Integer.valueOf(split[2]));
    }
}).keyBy(0,1).sum(2).print();
env.execute(Test.class.getSimpleName());

运行结果:

3> (neo,male,1000)
2> (neo,female,300)
6> (tom,female,600)
6> (tom,female,1400)

2、使用Bean

注意:使用Bean的时候需要考虑类型(参看上一篇博客:Flink数据类型和序列化,https://blog.csdn.net/zuodaoyong/article/details/103836224

public KeyedStream<T, Tuple> keyBy(String... fields) {
    return this.keyBy((Keys)(new ExpressionKeys(fields, this.getType())));
}
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
List<String> list= Arrays.asList(new String[]{"neo,male,1000","tom,female,800","neo,female,300","tom,female,600"});
DataStreamSource<String> streamSource = env.fromCollection(list);
streamSource.map(new MapFunction<String, Person>() {
    @Override
    public Person map(String line) throws Exception {
        String[] split = line.split(",");
        return new Person(split[0],split[1],Integer.valueOf(split[2]));
    }
}).keyBy("name","gender").sum("salary").print();
env.execute(Test.class.getSimpleName());
public class Person {
    private String name;
    private String gender;
    private Integer salary;

    public Person(String name, String gender, Integer salary) {
        this.name = name;
        this.gender = gender;
        this.salary = salary;
    }
    public Person(){}

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public Integer getSalary() {
        return salary;
    }

    public void setSalary(Integer salary) {
        this.salary = salary;
    }

    @Override
    public String toString() {
        return "Person{" +
                "name='" + name + '\'' +
                ", gender='" + gender + '\'' +
                ", salary=" + salary +
                '}';
    }
}

运行结果:

6> Person{name='tom', gender='female', salary=600}
2> Person{name='neo', gender='female', salary=300}
6> Person{name='tom', gender='female', salary=1400}
3> Person{name='neo', gender='male', salary=1000}

 

上一篇:微信小程序——自定义组件时,编译报`Component is not found in path '...'`


下一篇:Flink 高阶编程:时间语义