Interval Join
KeyedStream,KeyedStream → DataStream #
Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
.
Interval Join会将两个数据流按照相同的key,并且在其中一个流的时间范围内的数据进行join处理。通常用于把一定时间范围内相关的分组数据拉成一个宽表。我们通常可以用类似下面的表达式来使用interval Join来处理两个数据流:
key1 == key2 && e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
我们通常可以使用下面的编程模型来处理两个数据流:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
2.操作DataSet
实例如下:
public class JoinDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String,Integer>> data1=env.fromElements(
Tuple2.of("class1",100),
Tuple2.of("class1",400),
Tuple2.of("class2",200),
Tuple2.of("class2",400)
);
DataSet<Tuple2<String,Integer>> data2=env.fromElements(
Tuple2.of("class1",300),
Tuple2.of("class1",600),
Tuple2.of("class2",200),
Tuple2.of("class3",200)
);
data1.join(data2)
.where(0).equalTo(0)
.with(new JoinFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Object>() {
@Override
public Object join(Tuple2<String, Integer> tuple1,
Tuple2<String, Integer> tuple2) throws Exception {
return new String(tuple1.f0+" : "+tuple1.f1+" "+tuple2.f1);
}
}).print();
}
}
运行结果:
class1 : 100 300
class1 : 400 300
class1 : 100 600
class1 : 400 600
class2 : 200 200
class2 : 400 200
除此之外,在操作DataSet时还有很多join,如Outer Join,Flat Join等等,具体可以查看官方文档: