文章目录
以案例驱动对join的认知
一、前提:
- 要想两个数据流进行Join,必须对两个流数据划分相同的窗口,在同一个窗口中,进行数据的Join连接。
- 这里使用EventTime,划分滚动窗口
二、数据源
1、StreamDataSourceA
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
// 继承可并行的sourceFunction,并制定数据的输出类型
public class StreamDataSourceA extends RichParallelSourceFunction<Tuple3<String, String, Long>> {
/**
* volatile: 确保本条指令不会因编译器的优化而省略。
* 保证了一个线程修改了某个变量的值,
* 这新值对其他线程来说是立即可见的。(实现可见性)
*/
private volatile boolean flag = true;
//执行程序的
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
//准备好数据
Tuple3[] elements = new Tuple3[]{
Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000)
Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000)
Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000)
Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000) // 115000 - 5001 = 109998 <= 109999
Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000)
Tuple3.of("b", "6", 1000000108000L) //[100000 - 110000)
};
// 将tp3数组中的每一个tp都进行输出
int count = 0;
while (flag && count < elements.length) {
ctx.collect(Tuple3.of((String) elements[count].f0,
(String) elements[count].f1, (Long) elements[count].f2));
count++;
//程序睡眠1s,保证数据已经全部发出
Thread.sleep(1000);
}
//Thread.sleep(1000);
}
//While循环一直进行run读取处理,改变flag退出循环
@Override
public void cancel() {
flag = false;
}
}
2、StreamDataSourceB
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
public class StreamDataSourceB extends RichParallelSourceFunction<Tuple3<String, String, Long>> {
private volatile boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
// a ,1 hangzhou
Tuple3[] elements = new Tuple3[]{
Tuple3.of("a", "hangzhou", 1000000059000L), //[50000, 60000)
Tuple3.of("b", "beijing", 1000000105000L), //[100000, 110000)
};
int count = 0;
while (flag && count < elements.length) {
//将数据发出去
ctx.collect(new Tuple3<>((String) elements[count].f0,
(String) elements[count].f1, (long) elements[count].f2));
count++;
Thread.sleep(1000);
}
//Thread.sleep(100000000);
}
@Override
public void cancel() {
flag = false;
}
}
三、InnerJoin(只保留能join上的)代码
1、加载数据源
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class FlinkTumblingWindowsInnerJoinDemo {
public static void main(String[] args) throws Exception {
int windowSize = 10;
long delay = 5002L;
//wm = // 115000 - 5002 = 109998 <= 109999
// 触发不了窗口,下面的数据能正常接收,不会产生迟到
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设定EventTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//为了测试方便,这里设置并行度为1 (实际生产中,只有当所有subTask中的窗口都触发,窗口才会触发)
env.setParallelism(1);
// 一、加载数据源
SingleOutputStreamOperator<Tuple3<String, String, Long>> leftSource =
env.addSource(new StreamDataSourceA()).name("SourceA");
SingleOutputStreamOperator<Tuple3<String, String, Long>> rightSource =
env.addSource(new StreamDataSourceB()).name("SourceB");
2、对两个数据设置WaterMark
//("a", "1", 1000)
SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream =
leftSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,
String,
Long>>(Time.milliseconds(delay)) {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element) {
return element.f2;
}
});
//("a", "hangzhou", 6000)
SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream =
rightSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,
String,
Long>>(Time.milliseconds(delay)) {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element) {
return element.f2;
}
});
3、做InnerJoin操作
- 划分滚动窗口
- 调用apply方法,将同一个窗口join上的全量数据,全部收集
- 新写两个获取key的类
DataStream<Tuple5<String, String, String, Long, Long>> joined = leftStream.join(rightStream)
//join条件相等的字段
.where(new LeftSelectKey())
.equalTo(new RightSelectKey())
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize))) //划分窗口
.apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String,
String, String, Long, Long>>() {
@Override
public Tuple5<String, String, String, Long, Long> join(Tuple3<String, String, Long> first,
Tuple3<String, String, Long> second) throws Exception {
// 两个流的key的值相等,并且在同一个窗口内
// (a, 1, "hangzhou", 1000001000, 1000006000)
return new Tuple5<>(first.f0, first.f1, second.f1, first.f2, second.f2);
}
});
joined.print();
env.execute("FlinkTumblingWindowsInnerJoinDemo");
}
// leftStream获取join 的条件相等字段
public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
@Override
public String getKey(Tuple3<String, String, Long> value) throws Exception {
return value.f0;
}
}
public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
@Override
public String getKey(Tuple3<String, String, Long> value) throws Exception {
return value.f0;
}
}
}
四、LeftJoin代码
1、获取两个输入流
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class FlinkTumblingWindowsLeftJoinDemo {
public static void main(String[] args) throws Exception {
int windowSize = 10;
long delay = 5002L;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 一、获取两个输入流
SingleOutputStreamOperator<Tuple3<String, String, Long>> sourceA =
env.addSource(new StreamDataSourceA()).name("SourceA");
SingleOutputStreamOperator<Tuple3<String, String, Long>> sourceB =
env.addSource(new StreamDataSourceB()).name("SourceB");
2、抽取timeStamp 设定waterMark
SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream =
sourceA.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String,
Long>>(Time.milliseconds(delay)) {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element) {
return element.f2;
}
});
SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream =
sourceB.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String,
Long>>(Time.milliseconds(delay)) {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element) {
return element.f2;
}
});
3、做left join操作,调用coGroup方法
- ① 做left join操作,调用coGroup方法。 进到coGroup里的就是同一个窗口且key相同的,不一定有数据
- ② 划分滚动窗口
- ③ 调用apply方法,将同一个窗口join上的全量数据,全部收集
leftStream.coGroup(rightStream)
.where(new LeftSelectKey())
.equalTo(new RightSelectKey())
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.apply(new LeftJoin())
.print();
env.execute("FlinkTumblingWindowsLeftJoinDemo");
/**
* 最终的输出结果:
* (a,1,hangzhou,1000000050000,1000000059000)
* (a,2,hangzhou,1000000054000,1000000059000)
* (a,3,null,1000000079900,-1)
* (a,4,null,1000000115000,-1)
* (b,5,beijing,1000000100000,1000000105000)
* (b,6,beijing,1000000108000,1000000105000)
*/
}
(1)window.apply方法的LeftJoin类主逻辑代码
- 将key相同,并且在同一窗口的数据取出来
- 在coGroup方法时,通过key相等作为join条件,这样leftJoin后,有的数据会没有join上
- 右边没有join上的,就补null值
public static class LeftJoin implements CoGroupFunction<Tuple3<String, String, Long>, Tuple3<String, String,
Long>, Tuple5<String,
String, String, Long, Long>> {
// 将key相同,并且在同一窗口的数据取出来
@Override
public void coGroup(Iterable<Tuple3<String, String, Long>> first, Iterable<Tuple3<String, String, Long>> second,
Collector<Tuple5<String, String, String, Long, Long>> out) throws Exception {
for (Tuple3<String, String, Long> leftElem : first) {
boolean hadElements = false;
//如果左边的流join上了右边的流rightElements就不为空,就会走下面的增强for循环
for (Tuple3<String, String, Long> rightElem : second) {
//将join上的数据输出
out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, rightElem.f1, leftElem.f2,
rightElem.f2));
hadElements = true;
}
if (!hadElements) {
//没join上,给右边的数据赋空值
out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, "null", leftElem.f2, -1L));
}
}
}
}
(2)取出key来作为join条件的keySelect类
public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
@Override
public String getKey(Tuple3<String, String, Long> value) throws Exception {
return value.f0;
}
}
public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
@Override
public String getKey(Tuple3<String, String, Long> value) throws Exception {
return value.f0;
}
}
五、注意点
int windowSize = 10; //窗口大小:秒
long delay = 5002L; // 延迟时间
StreamDataSourceA数据源中:
Tuple3[] elements = new Tuple3[]{
Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000)
Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000)
Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000)
Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000) // 115000 - 5001 = 109998 <= 109999
Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000)
Tuple3.of("b", "6", 1000000108000L) //[100000 - 110000)
};
在StreamDataSourceA中数据源是这样的
- 当设定的延迟时间小于5002时,比如是5000毫秒
- 那么数据源中(“a”, “4”, 1000000115000L)这一项中的 waterMark = 115000 - 5000 = 110000 >= 109999边界,这个[100000 - 110000)窗口就会触发
- 此时,(“b”, “5”, 1000000100000L) , (“b”, “6”, 1000000108000L) 这两条数据就进不来了, 因为窗口在读取a4的时候就已经触发了,相当于数据迟到了,没坐上车
- 所以在打印结果的时候,后两条数据就没有join上
调整办法:
将延迟时间调大,比如delay = 5002L,这样 WaterMark = 115000 - 5002 = 109998 <= 109999,[100000 - 110000)这个窗口就不会被触发,后两条数据也就能接受到!
但是这也不是最好的解决办法。
最好的解决办法应该是,对迟到的数据,单独去接受,而不是丢弃。
具体的实现方法,请看下一篇博客!