FLINK-Exactly-once

一、具体操作 要做到 Exactly-once,我们需要开启 Flink 的检查点功能:
env.enableCheckpointing(60_000);
env.setStateBackend((StateBackend) new FsStateBackend("/tmp/flink/checkpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
检查点(Checkpoint)是 Flink 的故障恢复机制,同样会在下文详述。代码中,我们将状态存储方式由 MemoryStateBackend 修改为了 FsStateBackend,即使用外部文件系统,如 HDFS,来保存应用程序的中间状态,这样当 Flink JobManager 宕机时,也可以恢复过来。Flink 还支持 RocksDBStateBackend,用来存放较大的中间状态,并能支持增量的状态更新。   使用暂存点来停止和恢复脚本 当需要暂停脚本、或对程序逻辑进行修改时,我们需要用到 Flink 的暂存点机制(Savepoint)。暂存点和检查点类似,同样保存的是 Flink 各个算子的状态数据(Operator State)。不同的是,暂存点主要用于人为的脚本更替,而检查点则主要由 Flink 控制,用来实现故障恢复。flink cancel -s 命令可以在停止脚本的同时创建一个暂存点:
$ bin/flink cancel -s /tmp/flink/savepoints 1253cc85e5c702dbe963dd7d8d279038
Cancelled job 1253cc85e5c702dbe963dd7d8d279038. Savepoint stored in file:/tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee.
  具体到我们的 ETL 示例程序,暂存点中保存了当前 Kafka 队列的消费位置、正在写入的文件名等。当需要从暂存点恢复执行时,可以使用 flink run -s 传入目录位置。Flink 会从指定偏移量读取消息队列,并处理好中间结果文件,确保没有缺失或重复的数据。
flink run -s /tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar
  在 YARN 上运行 要将脚本提交到 YARN 集群上运行,同样是使用 flink run 命令。首先将代码中指定文件目录的部分添加上 HDFS 前缀,如 hdfs://localhost:9000/,重新打包后执行下列命令:
$ export HADOOP_CONF_DIR=/path/to/hadoop/conf
$ bin/flink run -m yarn-cluster -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar
Submitted application application_1545534487726_0001
  Flink 仪表盘会在 YARN Application Master 中运行,我们可以通过 ResourceManager 界面进入。返回的应用 ID 可以用来管理脚本,添加 -yid 参数即可:
bin/flink cancel -s hdfs://localhost:9000/tmp/flink/savepoints -yid application_1545534487726
  二、Flink 如何保证 Exactly-once 语义 Flink 实时处理程序可以分为三个部分,数据源、处理流程、以及输出。不同的数据源和输出提供了不同的语义保证,Flink 统称为 连接器。处理流程则能提供 Exactly-once 或 At-least-once 语义,需要看检查点是否开启。   实时处理与检查点 Flink 的检查点机制是基于 Chandy-Lamport 算法的:Flink 会定时在数据流中安插轻量的标记信息(Barrier),将消息流切割成一组组记录;当某个算子处理完一组记录后,就将当前状态保存为一个检查点,提交给 JobManager,该组的标记信息也会传递给下游;当末端的算子(通常是 Sink)处理完这组记录并提交检查点后,这个检查点将被标记为“已完成”;当脚本出现问题时,就会从最后一个“已完成”的检查点开始重放记录。 Stream Barrier 如果算子有多个上游,Flink 会使用一种称为“消息对齐”的机制:如果某个上游出现延迟,当前算子会停止从其它上游消费消息,直到延迟的上游赶上进度,这样就保证了算子中的状态不会包含下一批次的记录。显然,这种方式会引入额外的延迟,因此除了这种 EXACTLY_ONCE 模式,我们也可将检查点配置为 AT_LEAST_ONCE,以获得更高的吞吐量。具体方式请参考官方文档   可重放的数据源 当出错的脚本需要从上一个检查点恢复时,Flink 必须对数据进行重放,这就要求数据源支持这一功能。Kafka 是目前使用得较多的消息队列,且支持从特定位点进行消费。具体来说,FlinkKafkaConsumer 类实现了 CheckpointedFunction 接口,会在检查点中存放主题名、分区名、以及偏移量:
abstract class FlinkKafkaConsumerBase implements CheckpointedFunction {
  public void initializeState(FunctionInitializationContext context) {
    OperatorStateStore stateStore = context.getOperatorStateStore();
    this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
        OFFSETS_STATE_NAME,
        TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
 
 
    if (context.isRestored()) {
      for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
        restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
      }
    }
  }
 
  public void snapshotState(FunctionSnapshotContext context) {
    unionOffsetStates.clear();
    for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
      unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),
          kafkaTopicPartitionLongEntry.getValue()));
    }
  }
}
  当数据源算子从检查点或暂存点恢复时,我们可以在 TaskManager 的日志中看到以下信息,表明当前消费的偏移量是从算子状态中恢复出来的:
2018-12-23 10:56:47,380 INFO FlinkKafkaConsumerBase
  Consumer subtask 0 will start reading 2 partitions with offsets in restored state:
    {KafkaTopicPartition{topic='flink_test', partition=1}=725,
     KafkaTopicPartition{topic='flink_test', partition=0}=721}
  恢复写入中的文件 程序运行过程中,StreamingFileSink 首先会将结果写入中间文件,以 . 开头、in-progress 结尾。这些中间文件会在符合一定条件后更名为正式文件,取决于用户配置的 RollingPolicy,默认策略是基于时间(60 秒)和基于大小(128 MB)。当脚本出错或重启时,中间文件会被直接关闭;在恢复时,由于检查点中保存了中间文件名和成功写入的长度,程序会重新打开这些文件,切割到指定长度(Truncate),然后继续写入。这样一来,文件中就不会包含检查点之后的记录了,从而实现 Exactly-once。   以 Hadoop 文件系统举例,恢复的过程是在 HadoopRecoverableFsDataOutputStream 类的构造函数中进行的。它会接收一个 HadoopFsRecoverable 类型的结构,里面包含了中间文件的路径和长度。这个对象是 BucketState 的成员,会被保存在检查点中。
HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) {
  this.tempFile = checkNotNull(recoverable.tempFile());
  truncate(fs, tempFile, recoverable.offset());
  out = fs.append(tempFile);
}
     
上一篇:消息传递的语义性


下一篇:spark stream消费kafka Exactly-once