Operators
-
map
DataStream → DataStream -
flatMap
DataStream → DataStream -
fliter
DataStream → DataStream -
keyBy
DataStream → KeyedStream
对数据进行分流 -
reduce
KeyedStream/WindowedStream/AllWindowedStream → DataStream
用于keyBy或者window/windowAll之后 -
window
KeyedStream → WindowedStream
用于keyBy之后 -
windowAll
DataStream → AllWindowedStream
不用于keyBy之后,此算子并行度始终为1 -
apply
WindowedStream/AllWindowedStream → DataStream -
union
DataStream* → DataStream
合并相同类型的流 -
join
DataStream,DataStream → DataStream
比较两条流中的元素,如果相等输出,否则不进行输出。dataStream.join(otherStream)
.where().equalTo()
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {…}); -
Interval Join
KeyedStream,KeyedStream → DataStream// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {…}); -
CoGroup
DataStream,DataStream → DataStream
比较两条流中的元素,如果相等则放在一起输出,否则分开输出。重点是group。dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {…}); -
Connect
DataStream,DataStream → ConnectedStream
“连接”两条数据流,并保留他们的类型(类型可以不一样)。连接允许两个流之间共享状态。DataStream someStream = //…
DataStream otherStream = //…ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
-
CoMap, CoFlatMap
ConnectedStream → DataStream
专门针对ConnectedStream流的算子connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}@Override public Boolean map2(String value) { return false; }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Override
public void flatMap1(Integer value, Collector out) {
out.collect(value.toString());
}@Override
public void flatMap2(String value, Collector out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
}); -
Iterate
DataStream → IterativeStream → ConnectedStream
一个流被分为两部分,一部分持续不断循环输出,另一部分正常输出。IterativeStream iteration = initialStream.iterate();
DataStream iterationBody = iteration.map (/do something/);
DataStream feedback = iterationBody.filter(new FilterFunction(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream output = iterationBody.filter(new FilterFunction(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});