FLINK基础(103): DS算子与窗口(14)多流转换算子(5) Interval Join

Interval Join

KeyedStream,KeyedStream → DataStream #

Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound.

Interval Join会将两个数据流按照相同的key,并且在其中一个流的时间范围内的数据进行join处理。通常用于把一定时间范围内相关的分组数据拉成一个宽表。我们通常可以用类似下面的表达式来使用interval Join来处理两个数据流:

key1 == key2 && e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

FLINK基础(103): DS算子与窗口(14)多流转换算子(5) Interval Join

 

 我们通常可以使用下面的编程模型来处理两个数据流:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

2.操作DataSet

实例如下:

public class JoinDemo {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<String,Integer>> data1=env.fromElements(
                Tuple2.of("class1",100),
                Tuple2.of("class1",400),
                Tuple2.of("class2",200),
                Tuple2.of("class2",400)
        );

        DataSet<Tuple2<String,Integer>> data2=env.fromElements(
                Tuple2.of("class1",300),
                Tuple2.of("class1",600),
                Tuple2.of("class2",200),
                Tuple2.of("class3",200)
        );

        data1.join(data2)
                .where(0).equalTo(0)
                .with(new JoinFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Object>() {

                    @Override
                    public Object join(Tuple2<String, Integer> tuple1,
                                       Tuple2<String, Integer> tuple2) throws Exception {
                        return new String(tuple1.f0+" : "+tuple1.f1+" "+tuple2.f1);
                    }
                }).print();
    }
}

运行结果:

class1 : 100 300
class1 : 400 300
class1 : 100 600
class1 : 400 600
class2 : 200 200
class2 : 400 200

除此之外,在操作DataSet时还有很多join,如Outer Join,Flat Join等等,具体可以查看官方文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#join

 

上一篇:grafana配置重复报警repeat_interval间隔


下一篇:前端实现文字滚动(支持横向、纵向)