关于Transform中的union操作:
对两个或者两个以上的相同类型的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream
例如:将3个通类型的流进行合并
DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<Integer> stream2 = env.fromElements(10, 20, 30, 40, 50);
DataStreamSource<Integer> stream3 = env.fromElements(100, 200, 300, 400, 500);
// 把多个流union在一起成为一个流, 这些流中存储的数据类型必须一样: 水乳交融
stream1
.union(stream2)
.union(stream3)
刚才说到union在连接时有个限制就是必须要连接同种类型的流,这种情况有很大的局限性,所以Flink又为我们提供了另外一种连接的算子Connect,它可以连接不同类型的数据,但是Connect算子不可以向Union一样进行多流连接,它只能是2条流相连.
Connect算子和Union的区别最大在于:
- connect只能连接两个数据流,union可以连接多个数据流。
- connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
- Connect只是机械的合并在一起, 内部仍然是分离的2个流。
如下示例,将第一条流中的数据进行二次平方,将第二条流中的每一条数据开头都加上hello
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.Collector;
public class Flink05_Transform_Connect {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3);
DataStreamSource<String> stringDataStreamSource = env.fromElements("Tom", "Jack", "Marry");
//分别执行map操作
SingleOutputStreamOperator<Integer> IntDS = integerDataStreamSource.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value * value;
}
});
SingleOutputStreamOperator<String> StrDS = stringDataStreamSource.map(x -> "hello: " + x);
//connect合流操作
ConnectedStreams<String, Integer> connectDS = StrDS.connect(IntDS);
SingleOutputStreamOperator<Object> res = connectDS.map(new CoMapFunction<String, Integer, Object>() {
//分别对两条流进行操作
@Override
public Object map1(String value) throws Exception {
return value;
}
@Override
public Object map2(Integer value) throws Exception {
return "二次平方后的结果为: "+ value*value;
}
});
res.print();
env.execute();
}
}