Flink中Transform操作之union和connect

关于Transform中的union操作:
对两个或者两个以上相同类型的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream
Flink中Transform操作之union和connect

例如:将3个通类型的流进行合并

DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<Integer> stream2 = env.fromElements(10, 20, 30, 40, 50);
DataStreamSource<Integer> stream3 = env.fromElements(100, 200, 300, 400, 500);

// 把多个流union在一起成为一个流, 这些流中存储的数据类型必须一样: 水乳交融
stream1
  .union(stream2)
  .union(stream3)

刚才说到union在连接时有个限制就是必须要连接同种类型的流,这种情况有很大的局限性,所以Flink又为我们提供了另外一种连接的算子Connect,它可以连接不同类型的数据,但是Connect算子不可以向Union一样进行多流连接,它只能是2条流相连.

Flink中Transform操作之union和connect

Connect算子和Union的区别最大在于:

  • connect只能连接两个数据流,union可以连接多个数据流。
  • connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
  • Connect只是机械的合并在一起, 内部仍然是分离的2个流。

如下示例,将第一条流中的数据进行二次平方,将第二条流中的每一条数据开头都加上hello


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.Collector;

public class Flink05_Transform_Connect {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3);
        DataStreamSource<String> stringDataStreamSource = env.fromElements("Tom", "Jack", "Marry");

        //分别执行map操作
        SingleOutputStreamOperator<Integer> IntDS = integerDataStreamSource.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                return value * value;
            }
        });
        SingleOutputStreamOperator<String> StrDS = stringDataStreamSource.map(x -> "hello: " + x);

        //connect合流操作
        ConnectedStreams<String, Integer> connectDS = StrDS.connect(IntDS);

        SingleOutputStreamOperator<Object> res = connectDS.map(new CoMapFunction<String, Integer, Object>() {
            //分别对两条流进行操作
            @Override
            public Object map1(String value) throws Exception {
                return value;
            }

            @Override
            public Object map2(Integer value) throws Exception {
                return "二次平方后的结果为: "+ value*value;
            }
        });

        res.print();

        env.execute();


    }
}

Flink中Transform操作之union和connect

上一篇:渗透测试之sql注入验证安全与攻击性能


下一篇:数据库连接池(DBCP:为数据统一建立一个缓冲池,现在企业开发使用)