大数据之flink容错机制

一、概念理解

1、State状态

Flink实时计算程序为了保证计算过程中,出现异常可以容错,就要将中间的计算结果数据存储起来,这些中间数据就叫做State。

2、StateBackEnd

用来保存State的存储后端就叫做StateBackEnd,默认是保存在JobManager的内存中,也可以保存的本地文件系统或HDFS这样的分布式文件系统

3、CheckPointing

Flink实时计算为了容错,可以将中间数据定期保存到起来,这种定期触发保存中间结果的机制叫CheckPointing

二、重启策略

设置重启策略:固定间隔、失败率、无限重启

package cn._51doit.flink.day06;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

//设置重启策略
public class RestartStrategyDemo1 {

    public static void main(String[] args) throws Exception{

        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        //env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.seconds(30), Time.seconds(3)));  //30秒内不能达到3次,每次重启延迟时间为3秒
        //开启checkpoint
        env.enableCheckpointing(10000); //如果开启checkpoint,默认的重启策略是无限重启

        //创建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if("error".equals(word)) {
                        throw new RuntimeException("出现异常了!!!!!");
                    }
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        //Transformation结束s
        //调用Sink
        summed.print();

        //启动执行
        env.execute("StreamingWordCount");

    }
}

三、

上一篇:目标检测之R-CNN/Fast R-CNN/Faster R-CNN


下一篇:python生成指定长度的随机值