JavaFlink开发的一些基本配置

默认情况下,检查点被禁用。要启用检查点,请在StreamExecutionEnvironment上调用enableCheckpointing(n)方法,其中n是以毫秒为单位的检查点间隔。

检查点的其他参数包括:

  • exactly-once vs. at-least-once:你可以从这两种模式中选择一种模式传递给enableCheckpointing(n)方法。Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)。

  • 检查点超时:如果检查点构造时间超过该值,则终止正在构建的检查点。

  • 检查点间的最短时间:为了确保流应用程序在检查点之间能有一定程度的进展,可以设定检查点之间最短的时间。如果该值设置为5000,则下一个检查点将在上一个检查点完成后5秒钟内启动,而不管检查点持续时间和检查点间隔。注意这意味着检查点间隔参数应该永远不小于此参数。

    通过先定义检查点间的最短时间,再定义检查点间隔,可以更容易地配置应用程序,因为“检查点间的最短时间”不容易受到检查点有时耗时比平均更长的事实的影响(例如,如果存放检查点的目标存储系统暂时缓慢)。

    注意该值还意味着并发的检查点数为1

  • 并发的检查点数:默认情况下,系统不会在进行一个检查点时再触发另一个检查点。这能确保拓扑不会因为在检查点上耗时过多以致流处理进展缓慢。有些情况允许多个重叠的检查点是有意义的:对于有固定处理延时的pipelines(比如因为函数调用外部服务而需要一些响应时间),但仍需要做非常频繁的检查点(100毫秒)以减轻遇到错误时重新处理的代价。

    当定义了“检查点间的最短时间”,就不能使用此选项。

// 使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// make parameters available in the web interface
//配置web界面参数可用
env.getConfig.setGlobalJobParameters(params)
//
env.setMaxParallelism(maxParallelism)

// start a checkpoint every 1min
 // 每隔1分钟进行启动一个检查点
env.enableCheckpointing(1 * 60000)
// advanced options:
// set mode to exactly-once (this is the default)
 // 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// make sure 1min of progress happen between checkpoints
// 确保检查点之间有进行30000ms的进度
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(30000)
// checkpoints have to complete within one minute, or are discarded

//检查点必须在二分钟内完成,否则将被丢弃
env.getCheckpointConfig.setCheckpointTimeout(2 * 60000)
// allow only one checkpoint to be in progress at the same time
// 同一时间只允许进行一个检查点
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// enable externalized checkpoints which are retained after job cancellation
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

选择状态的后端存储(State Backend)

检查点机制将数据source和数据sink的进度,window的状态以及用户定义状态JavaFlink开发的一些基本配置http://flink.iteblog.com/dev/stream/state.html一致地存储起来以提供exactly once语义。存储检查点的位置(例如,JobManager的内存,文件系统,数据库)取决于配置的状态后端

默认情况下,状态将保存在内存中,检查点将存储在主节点(JobManager)的内存中。 为了正确地保留大状态,Flink支持各种形式的存储和检查点状态,可以通过StreamExecutionEnvironment.setStateBackend(…)进行设置。

参阅 state backendsJavaFlink开发的一些基本配置http://flink.iteblog.com/ops/state_backends.html 了解更多关于支持的状态后端(state backends)以及作业端和集群端的详细配置。

持续更新。。。。。。

上一篇:azkaban job参数传递


下一篇:谈谈流计算中的『Exactly Once』特性