默认情况下,检查点被禁用。要启用检查点,请在StreamExecutionEnvironment
上调用enableCheckpointing(n)
方法,其中n是以毫秒为单位的检查点间隔。
检查点的其他参数包括:
-
exactly-once vs. at-least-once:你可以从这两种模式中选择一种模式传递给
enableCheckpointing(n)
方法。Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)。 -
检查点超时:如果检查点构造时间超过该值,则终止正在构建的检查点。
-
检查点间的最短时间:为了确保流应用程序在检查点之间能有一定程度的进展,可以设定检查点之间最短的时间。如果该值设置为5000,则下一个检查点将在上一个检查点完成后5秒钟内启动,而不管检查点持续时间和检查点间隔。注意这意味着检查点间隔参数应该永远不小于此参数。
通过先定义检查点间的最短时间,再定义检查点间隔,可以更容易地配置应用程序,因为“检查点间的最短时间”不容易受到检查点有时耗时比平均更长的事实的影响(例如,如果存放检查点的目标存储系统暂时缓慢)。
注意该值还意味着并发的检查点数为1。
-
并发的检查点数:默认情况下,系统不会在进行一个检查点时再触发另一个检查点。这能确保拓扑不会因为在检查点上耗时过多以致流处理进展缓慢。有些情况允许多个重叠的检查点是有意义的:对于有固定处理延时的pipelines(比如因为函数调用外部服务而需要一些响应时间),但仍需要做非常频繁的检查点(100毫秒)以减轻遇到错误时重新处理的代价。
当定义了“检查点间的最短时间”,就不能使用此选项。
// 使用事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // make parameters available in the web interface //配置web界面参数可用 env.getConfig.setGlobalJobParameters(params) // env.setMaxParallelism(maxParallelism) // start a checkpoint every 1min // 每隔1分钟进行启动一个检查点 env.enableCheckpointing(1 * 60000) // advanced options: // set mode to exactly-once (this is the default) // 设置模式为exactly-once (这是默认值) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // make sure 1min of progress happen between checkpoints // 确保检查点之间有进行30000ms的进度 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(30000) // checkpoints have to complete within one minute, or are discarded //检查点必须在二分钟内完成,否则将被丢弃 env.getCheckpointConfig.setCheckpointTimeout(2 * 60000) // allow only one checkpoint to be in progress at the same time // 同一时间只允许进行一个检查点 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // enable externalized checkpoints which are retained after job cancellation
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
选择状态的后端存储(State Backend)
检查点机制将数据source和数据sink的进度,window的状态以及用户定义状态http://flink.iteblog.com/dev/stream/state.html一致地存储起来以提供exactly once语义。存储检查点的位置(例如,JobManager的内存,文件系统,数据库)取决于配置的状态后端。
默认情况下,状态将保存在内存中,检查点将存储在主节点(JobManager)的内存中。 为了正确地保留大状态,Flink支持各种形式的存储和检查点状态,可以通过StreamExecutionEnvironment.setStateBackend(…)
进行设置。
参阅 state backendshttp://flink.iteblog.com/ops/state_backends.html 了解更多关于支持的状态后端(state backends)以及作业端和集群端的详细配置。
持续更新。。。。。。