Flink之详解InnerJoin、leftJoin以及窗口延迟时间的问题

文章目录

以案例驱动对join的认知

一、前提:

  • 要想两个数据流进行Join,必须对两个流数据划分相同的窗口,在同一个窗口中,进行数据的Join连接。
  • 这里使用EventTime,划分滚动窗口

二、数据源

1、StreamDataSourceA

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

// 继承可并行的sourceFunction,并制定数据的输出类型
public class StreamDataSourceA extends RichParallelSourceFunction<Tuple3<String, String, Long>> {

    /**
     * volatile: 确保本条指令不会因编译器的优化而省略。
     * 保证了一个线程修改了某个变量的值,
     * 这新值对其他线程来说是立即可见的。(实现可见性)
     */
    private volatile boolean flag = true;

    //执行程序的
    @Override
    public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
        //准备好数据
        Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000)
                Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000)
                Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000)
                Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000)  // 115000 - 5001 = 109998‬ <= 109999
                Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000)
                Tuple3.of("b", "6", 1000000108000L)  //[100000 - 110000)
        };

        // 将tp3数组中的每一个tp都进行输出
        int count = 0;
        while (flag && count < elements.length) {

            ctx.collect(Tuple3.of((String) elements[count].f0,
                    (String) elements[count].f1, (Long) elements[count].f2));
            count++;

            //程序睡眠1s,保证数据已经全部发出
            Thread.sleep(1000);
        }
        //Thread.sleep(1000);
    }

    //While循环一直进行run读取处理,改变flag退出循环
    @Override
    public void cancel() {
        flag = false;
    }
}

2、StreamDataSourceB

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class StreamDataSourceB extends RichParallelSourceFunction<Tuple3<String, String, Long>> {

    private volatile boolean flag = true;

    @Override
    public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {

        // a ,1 hangzhou
        Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "hangzhou", 1000000059000L), //[50000, 60000)
                Tuple3.of("b", "beijing",  1000000105000L), //[100000, 110000)
        };

        int count = 0;
        while (flag && count < elements.length) {
            //将数据发出去
            ctx.collect(new Tuple3<>((String) elements[count].f0,
                    (String) elements[count].f1, (long) elements[count].f2));

            count++;
            Thread.sleep(1000);
        }

        //Thread.sleep(100000000);
    }

    @Override
    public void cancel() {
        flag = false;
    }
}

三、InnerJoin(只保留能join上的)代码

1、加载数据源

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class FlinkTumblingWindowsInnerJoinDemo {
    public static void main(String[] args) throws Exception {
        int windowSize = 10;
        long delay = 5002L;
        //wm = // 115000 - 5002 = 109998‬ <= 109999
        // 触发不了窗口,下面的数据能正常接收,不会产生迟到


        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设定EventTime作为时间标准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //为了测试方便,这里设置并行度为1 (实际生产中,只有当所有subTask中的窗口都触发,窗口才会触发)
        env.setParallelism(1);

        // 一、加载数据源
        SingleOutputStreamOperator<Tuple3<String, String, Long>> leftSource =
                env.addSource(new StreamDataSourceA()).name("SourceA");

        SingleOutputStreamOperator<Tuple3<String, String, Long>> rightSource =
                env.addSource(new StreamDataSourceB()).name("SourceB");

2、对两个数据设置WaterMark

        //("a", "1", 1000)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream =
                leftSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,
                        String,
                        Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {

                        return element.f2;
                    }
                });

        //("a", "hangzhou", 6000)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream =
                rightSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,
                        String,
                        Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {
                        return element.f2;
                    }
                });

3、做InnerJoin操作

  • 划分滚动窗口
  • 调用apply方法,将同一个窗口join上的全量数据,全部收集
  • 新写两个获取key的类
        DataStream<Tuple5<String, String, String, Long, Long>> joined = leftStream.join(rightStream)
                //join条件相等的字段
                .where(new LeftSelectKey())
                .equalTo(new RightSelectKey())
                .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))  //划分窗口
                .apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String,
                        String, String, Long, Long>>() {
                    @Override
                    public Tuple5<String, String, String, Long, Long> join(Tuple3<String, String, Long> first,
                                                                           Tuple3<String, String, Long> second) throws Exception {
                        // 两个流的key的值相等,并且在同一个窗口内
                        // (a, 1, "hangzhou", 1000001000, 1000006000)
                        return new Tuple5<>(first.f0, first.f1, second.f1, first.f2, second.f2);
                    }
                });

        joined.print();
        env.execute("FlinkTumblingWindowsInnerJoinDemo");
    }


    // leftStream获取join 的条件相等字段
    public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0;
        }
    }

    public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0;
        }
    }
}

四、LeftJoin代码

1、获取两个输入流

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class FlinkTumblingWindowsLeftJoinDemo {
    public static void main(String[] args) throws Exception {
        int windowSize = 10;
        long delay = 5002L;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 一、获取两个输入流
        SingleOutputStreamOperator<Tuple3<String, String, Long>> sourceA =
                env.addSource(new StreamDataSourceA()).name("SourceA");

        SingleOutputStreamOperator<Tuple3<String, String, Long>> sourceB =
                env.addSource(new StreamDataSourceB()).name("SourceB");
                

2、抽取timeStamp 设定waterMark

        SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream =
                sourceA.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String,
                        Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {
                        return element.f2;
                    }
                });

        SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream =
                sourceB.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String,
                        Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {
                        return element.f2;
                    }
                });

3、做left join操作,调用coGroup方法

  • ① 做left join操作,调用coGroup方法。 进到coGroup里的就是同一个窗口且key相同的,不一定有数据
  • ② 划分滚动窗口
  • ③ 调用apply方法,将同一个窗口join上的全量数据,全部收集
        leftStream.coGroup(rightStream)
                .where(new LeftSelectKey())
                .equalTo(new RightSelectKey())
                .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
                .apply(new LeftJoin())
                .print();

        env.execute("FlinkTumblingWindowsLeftJoinDemo");
        /**
         *	最终的输出结果:
         * 	(a,1,hangzhou,1000000050000,1000000059000)
         * 	(a,2,hangzhou,1000000054000,1000000059000)
         *	(a,3,null,1000000079900,-1)
         * 	(a,4,null,1000000115000,-1)
         * 	(b,5,beijing,1000000100000,1000000105000)
         * 	(b,6,beijing,1000000108000,1000000105000)
         */
    }
(1)window.apply方法的LeftJoin类主逻辑代码
  • 将key相同,并且在同一窗口的数据取出来
  • 在coGroup方法时,通过key相等作为join条件,这样leftJoin后,有的数据会没有join上
  • 右边没有join上的,就补null值
 public static class LeftJoin implements CoGroupFunction<Tuple3<String, String, Long>, Tuple3<String, String,
            Long>, Tuple5<String,
            String, String, Long, Long>> {

        // 将key相同,并且在同一窗口的数据取出来
        @Override
        public void coGroup(Iterable<Tuple3<String, String, Long>> first, Iterable<Tuple3<String, String, Long>> second,
                            Collector<Tuple5<String, String, String, Long, Long>> out) throws Exception {

            for (Tuple3<String, String, Long> leftElem : first) {
                boolean hadElements = false;

                //如果左边的流join上了右边的流rightElements就不为空,就会走下面的增强for循环
                for (Tuple3<String, String, Long> rightElem : second) {
                    //将join上的数据输出
                    out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, rightElem.f1, leftElem.f2,
                            rightElem.f2));
                    hadElements = true;
                }

                if (!hadElements) {
                    //没join上,给右边的数据赋空值
                    out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, "null", leftElem.f2, -1L));
                }
            }
        }
    }
(2)取出key来作为join条件的keySelect类
   public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0;
        }
    }

    public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0;
        }
    }

五、注意点


int windowSize = 10; //窗口大小:秒
long delay = 5002L; // 延迟时间


StreamDataSourceA数据源中:
 Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000)
                Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000)
                Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000)
                Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000)  // 115000 - 5001 = 109998‬ <= 109999
                Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000)
                Tuple3.of("b", "6", 1000000108000L)  //[100000 - 110000)
        };

在StreamDataSourceA中数据源是这样的

  • 当设定的延迟时间小于5002时,比如是5000毫秒
  • 那么数据源中(“a”, “4”, 1000000115000L)这一项中的 waterMark = 115000 - 5000 = 110000‬ >= 109999边界,这个[100000 - 110000)窗口就会触发
  • 此时,(“b”, “5”, 1000000100000L) , (“b”, “6”, 1000000108000L) 这两条数据就进不来了, 因为窗口在读取a4的时候就已经触发了,相当于数据迟到了,没坐上车
  • 所以在打印结果的时候,后两条数据就没有join上

调整办法:

将延迟时间调大,比如delay = 5002L,这样 WaterMark = 115000 - 5002 = 109998‬ <= 109999,[100000 - 110000)这个窗口就不会被触发,后两条数据也就能接受到!

但是这也不是最好的解决办法。
最好的解决办法应该是,对迟到的数据,单独去接受,而不是丢弃。
具体的实现方法,请看下一篇博客!

上一篇:Alink漫谈(十八) :源码解析 之 多列字符串编码MultiStringIndexer


下一篇:tuple元组