一、概念理解
1、State状态
Flink实时计算程序为了保证计算过程中,出现异常可以容错,就要将中间的计算结果数据存储起来,这些中间数据就叫做State。
2、StateBackEnd
用来保存State的存储后端就叫做StateBackEnd,默认是保存在JobManager的内存中,也可以保存的本地文件系统或HDFS这样的分布式文件系统
3、CheckPointing
Flink实时计算为了容错,可以将中间数据定期保存到起来,这种定期触发保存中间结果的机制叫CheckPointing
二、重启策略
设置重启策略:固定间隔、失败率、无限重启
package cn._51doit.flink.day06;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
//设置重启策略
public class RestartStrategyDemo1 {
public static void main(String[] args) throws Exception{
//创建Flink流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置重启策略
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
//env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.seconds(30), Time.seconds(3))); //30秒内不能达到3次,每次重启延迟时间为3秒
//开启checkpoint
env.enableCheckpointing(10000); //如果开启checkpoint,默认的重启策略是无限重启
//创建DataStream
//Source
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//调用Transformation开始
//调用Transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
if("error".equals(word)) {
throw new RuntimeException("出现异常了!!!!!");
}
//new Tuple2<String, Integer>(word, 1)
collector.collect(Tuple2.of(word, 1));
}
}
});
//分组
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tp) throws Exception {
return tp.f0;
}
});
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
//Transformation结束s
//调用Sink
summed.print();
//启动执行
env.execute("StreamingWordCount");
}
}
三、