Flink Streaming Yarn flink-conf.yaml 配置参数

配置参数

1.12.5

#==============================================================================

# Common

#==============================================================================

# The external address of the host on which the JobManager runs and can be

# reached by the TaskManagers and any clients which want to connect. This setting

# is only used in Standalone mode and may be overwritten on the JobManager side

# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.

# In high availability mode, if you use the bin/start-cluster.sh script and setup

# the conf/masters file, this will be taken care of automatically. Yarn/Mesos

# automatically configure the host name based on the hostname of the node where the

# JobManager runs.

#作业管理器运行的主机的外部地址

#由任务经理和任何想要连接的客户端联系。此设置

#仅在独立模式下使用,并且可以在作业管理器侧被覆盖

#通过指定bin/jobmanager.sh可执行文件的--host<主机名>参数。

#在高可用性模式下,如果您使用bin/start-cluster.sh脚本和设置

#通信/主文件,这将被自动处理。纱线/梅索斯

#根据所在节点的主机名自动配置主机名

#运行JobManger。

jobmanager.rpc.address: localhost

# The RPC port where the JobManager is reachable.

#可访问作业管理器的RPC端口。

jobmanager.rpc.port: 6123

# The total process memory size for the JobManager.

#

# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.

#作业管理器的总进程内存大小。

#

#注意,这解释了作业管理器进程中的所有内存使用情况,包括JVM元空间和其他开销。

jobmanager.memory.process.size: 1600m

# The total process memory size for the TaskManager.

#

# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.

#任务管理器的总进程内存大小。

#

#注意,这解释了任务管理器进程中的所有内存使用情况,包括JVM元空间和其他开销。

taskmanager.memory.process.size: 1728m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.

# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.

#

# taskmanager.memory.flink.size: 1280m

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

#要排除JVM元空间和开销,请使用总Flink内存大小,而不是“taskmanager.memory.process.size”。

#不建议同时设置“taskmanager.memory.process.size”和Flink内存。

#

#taskmanager.memory.flink.size:1280m

#每个任务管理器提供的任务插槽数。每个插槽运行一个并行的管道。

taskmanager.numberOfTaskSlots: 1

# The parallelism used for programs that did not specify and other parallelism.

#用于未指定的程序的并行性和其他并行性。

parallelism.default: 1

# The default file system scheme and authority.

#

# By default file paths without scheme are interpreted relative to the local

# root file system 'file:///'. Use this to override the default and interpret

# relative paths relative to a different file system,

# for example 'hdfs://mynamenode:12345'

#

# fs.default-scheme

#默认的文件系统方案和权限。

#

#默认情况下,没有方案的文件路径会相对于本地路径进行解释

#根文件系统“file:///”。使用此选项可以覆盖默认值和解释

#相对于不同文件系统的相对路径,

例如“hdfs://内存:12345”

#

#fs.default-scheme

#==============================================================================

# High Availability

#==============================================================================

# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.

#

# high-availability: zookeeper

#高可用性模式。可能的选择是“没有”或“动物园管理员”。

#

高可用性:动物园管理员

# The path where metadata for master recovery is persisted. While ZooKeeper stores

# the small ground truth for checkpoint and leader election, this location stores

# the larger objects, like persisted dataflow graphs.

#

# Must be a durable file system that is accessible from all nodes

# (like HDFS, S3, Ceph, nfs, ...)

#

# high-availability.storageDir: hdfs:///flink/ha/

#主恢复的元数据的路径。而动物园管理员商店

#检查站和*选举的小地面真相,这个位置的商店

#更大的对象,比如持久化的数据流图。

#

#必须是一个可以从所有节点访问的持久的文件系统

#(比如HDFS,S3,Ceph,nfs,……)

#

#high-availability.storageDir:hdfs:///flink/ha/

# The list of ZooKeeper quorum peers that coordinate the high-availability

# setup. This must be a list of the form:

# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)

#

# high-availability.zookeeper.quorum: localhost:2181

#协调高可用性的动物园管理员仲裁对等点的列表

#设置。这必须是以下表单的列表:

#“主机1:客户端端口,host2:clientPort,...”(缺省客户端端口:2181)

#

#high-availability.zookeeper.quorum:本地主机:2181

# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes

# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)

# The default value is "open" and it can be changed to "creator" if ZK security is enabled

#

# high-availability.zookeeper.client.acl: open

#ACL选项是基于https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes的

#它可以是“创建者”(ZOO_CREATE_ALL_ACL)或“open”(ZOO_OPEN_ACL_UNSAFE)

#默认值为“打开”,如果启用了ZK安全,则可以将其更改为“创建者”

#

#high-availability.zookeeper.client.acl:打开

#==============================================================================

# Fault tolerance and checkpointing

#==============================================================================

# The backend that will be used to store operator state checkpoints if

# checkpointing is enabled.

#

# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the

# <class-name-of-factory>.

#

# state.backend: filesystem

#将用于存储操作员状态检查点的后端,如果

#检查点已启用。

#

#支持的后端是“作业管理器”、“文件系统”、“rocksdb”或

#<工厂名称>。

#

#state.backend:filesystem

# Directory for checkpoints filesystem, when using any of the default bundled

# state backends.

#

# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

#检查点文件系统的目录,当使用任何默认捆绑时

#状态后端。

#

#state.checkpoints.dir:hdfs://名称节点-主机:端口/闪烁-检查点

# Default target directory for savepoints, optional.

#

# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints

# Flag to enable/disable incremental checkpoints for backends that

# support incremental checkpoints (like the RocksDB state backend).

#

# state.backend.incremental: false

#保存点的默认目标目录,这是可选的。

#

#state.savepoints.dir:hdfs://名称节点-主机:端口/闪烁保存点

#标志,以启用/禁用后端的增量检查点

#支持增量检查点(如RocksDB状态后端)。

#

#state.backend.incremental:假的

# The failover strategy, i.e., how the job computation recovers from task failures.

# Only restart tasks that may have been affected by the task failure, which typically includes

# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.

#故障转移策略,即作业计算如何从任务失败中恢复。

#只重新启动可能受到任务失败影响的任务,这通常包括

#下游任务和潜在的上游任务,如果它们产生的数据不再可用于消费

jobmanager.execution.failover-strategy: region

#==============================================================================

# Rest & web frontend

#==============================================================================

# The port to which the REST client connects to. If rest.bind-port has

# not been specified, then the server will bind to this port as well.

#

#rest.port: 8081

#REST客户端连接到的端口。如果rest.bind-port有

#没有指定,那么服务器也将绑定到此端口。

#

#休息。端口:8081

# The address to which the REST client will connect to

#

#rest.address: 0.0.0.0

#REST客户端将连接到的地址

#

#rest.地址:0.0.0.0

# Port range for the REST and web server to bind to.

#

#rest.bind-port: 8080-8090

#要绑定到的REST和web服务器的端口范围。

#

#rest.bind-port:8080-8090

# The address that the REST & web server binds to

#

#rest.bind-address: 0.0.0.0

#REST&Web服务器绑定到的地址

#

#rest.bind-address:0.0.0.0

# Flag to specify whether job submission is enabled from the web-based

# runtime monitor. Uncomment to disable.

#web.submit.enable: false

#标记,以指定是否启用了基于web的作业提交

#运行时监视器。无法置评以禁用。

#web.submit.enable:假的

#==============================================================================

# Advanced

#==============================================================================

# Override the directories for temporary files. If not specified, the

# system-specific Java temporary directory (java.io.tmpdir property) is taken.

#

# For framework setups on Yarn or Mesos, Flink will automatically pick up the

# containers' temp directories without any need for configuration.

#

# Add a delimited list for multiple directories, using the system directory

# delimiter (colon ':' on unix) or a comma, e.g.:

#     /data1/tmp:/data2/tmp:/data3/tmp

#

# Note: Each directory entry is read from and written to by a different I/O

# thread. You can include the same directory multiple times in order to create

# multiple I/O threads against that directory. This is for example relevant for

# high-throughput RAIDs.

#

# io.tmp.dirs: /tmp

#覆盖临时文件的目录。如果未指定,则

已获取#特定于系统的Java临时目录(java.io.tmpdir属性)。

#

#对于纱线或台面上的框架设置,Flink将自动拾起

#容器的临时目录,而不需要任何配置。

#

#使用系统目录为多个目录添加一个带有分隔符的列表

#分隔符(unix上的冒号“:”)或逗号。:

#/data1/tmp:/data2/tmp:/data3/tmp

#

注意:每个目录条目都由不同的I/O读取和写入

#线程。您可以多次包含相同的目录,以便创建

#针对该目录的多个I/O线程。例如,这与

#高通量raid。

#

#io.tmp.dirs:/tmp

# The classloading resolve order. Possible values are 'child-first' (Flink's default)

# and 'parent-first' (Java's default).

#

# Child first classloading allows users to use different dependency/library

# versions in their application than those in the classpath. Switching back

# to 'parent-first' may help with debugging dependency issues.

#

# classloader.resolve-order: child-first

# The amount of memory going to the network stack. These numbers usually need

# no tuning. Adjusting them may be necessary in case of an "Insufficient number

# of network buffers" error. The default min is 64MB, the default max is 1GB.

#

# taskmanager.memory.network.fraction: 0.1

# taskmanager.memory.network.min: 64mb

# taskmanager.memory.network.max: 1gb

#类加载解析顺序。可能的值是“子级优先”(Flink的默认值)

#和“父优先”(Java的默认值)。

#

#子第一类加载允许用户使用不同的依赖关系/库

#其应用程序中的版本比类路径中的版本要多。切换回

#到“父优先”可能有助于调试依赖性问题。

#

#classloader.resolve-order:孩子优先

#进入网络堆栈的内存量。这些数字通常需要

#没有调优。如果出现“数量不足”,可能需要调整它们

网络缓冲区的#”错误。默认最小值为64MB,默认最大值为1GB。

#

#taskmanager.memory.network.fraction:0.1

#taskmanager.memory.network.min:64mb

#taskmanager.memory.network.max:1gb

#==============================================================================

# Flink Cluster Security Configuration

#==============================================================================

# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -

# may be enabled in four steps:

# 1. configure the local krb5.conf file

# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)

# 3. make the credentials available to various JAAS login contexts

# 4. configure the connector to use JAAS/SASL

# The below configure how Kerberos credentials are provided. A keytab will be used instead of

# a ticket cache if the keytab path and principal are set.

# security.kerberos.login.use-ticket-cache: true

# security.kerberos.login.keytab: /path/to/kerberos/keytab

# security.kerberos.login.principal: flink-user

# The configuration below defines which JAAS login contexts

# security.kerberos.login.contexts: Client,KafkaClient

#==============================================================================

# ZK Security Configuration

#==============================================================================

# Below configurations are applicable if ZK ensemble is configured for security

# Override below configuration to provide custom ZK service name if configured

# zookeeper.sasl.service-name: zookeeper

# The configuration below must match one of the values set in "security.kerberos.login.contexts"

# zookeeper.sasl.login-context-name: Client

#==============================================================================

# HistoryServer

#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)

# Directory to upload completed jobs to. Add this directory to the list of

# monitored directories of the HistoryServer as well (see below).

#jobmanager.archive.fs.dir: hdfs:///completed-jobs/

# The address under which the web-based HistoryServer listens.

#historyserver.web.address: 0.0.0.0

# The port under which the web-based HistoryServer listens.

#historyserver.web.port: 8082

# Comma separated list of directories to monitor for completed jobs.

#historyserver.archive.fs.dir: hdfs:///completed-jobs/

# Interval in milliseconds for refreshing the monitored directories.

#historyserver.archive.fs.refresh-interval: 10000

基本设置.

Memory Sizes

默认的内存大小支持简单的流/批处理应用程序,但是内存太低,无法为更复杂的应用程序产生良好的性能。

  • jobmanager.memory.process.size: 总大小 of the JobManager (JobMaster / ResourceManager / Dispatcher) process.
  • taskmanager.memory.process.size: 总大小 of the TaskManager process.

The total sizes include everything. Flink will subtract some memory for the JVM’s own memory requirements (metaspace and others), and divide and configure the rest automatically between its components (JVM Heap, Off-Heap, for Task Managers also network, managed memory etc.).

总尺寸包括一切。Flink将减去JVM自己的内存需求(元空间和其他内存),并将其余的内存自动分配和配置在其组件之间(JVMHeap、Off-Heap,也用于任务管理器的网络、管理内存等)。

for example 1536m or 2g.

Parallelism

taskmanager.numberOfTaskSlots: The number of slots that a TaskManager offers (default: 1). Each slot can take one task or pipeline. Having multiple slots in a TaskManager can help amortize certain constant overheads (of the JVM, application libraries, or network connections) across parallel tasks or pipelines. See the Task Slots and Resources concepts section for details.

任务管理器提供的插槽数(默认值:1)。每个插槽可以承担一个任务或管道。在任务管理器中拥有多个插槽可以帮助跨并行任务或管道分摊某些不变的开销(JVM、应用程序库或网络连接)。有关详细信息,请参见任务插槽和资源概念部分。

Running more smaller TaskManagers with one slot each is a good starting point and leads to the best isolation between tasks. Dedicating the same resources to fewer larger TaskManagers with more slots can help to increase resource utilization, at the cost of weaker isolation between the tasks (more tasks share the same JVM).

parallelism.default: The default parallelism used when no parallelism is specified anywhere (default: 1).

运行更小的任务管理器,每个插槽是一个很好的起点,并导致任务之间的最佳隔离。将相同的资源分配给更少的更大的插槽的任务管理器可以帮助提高资源利用率,代价是任务之间的隔离较弱(更多的任务共享相同的JVM)。

parallelism.default:在任何地方都未指定并行性时使用的默认并行性(默认值:1)。

Checkpointing

You can configure checkpointing directly in code within your Flink job or application. Putting these values here in the configuration defines them as defaults in case the application does not configure anything.

  • state.backend: The state backend to use. This defines the data structure mechanism for taking snapshots. Common values are filesystem or rocksdb.
  • state.checkpoints.dir: The directory to write checkpoints to. This takes a path URI like s3://mybucket/flink-app/checkpoints or hdfs://namenode:port/flink/checkpoints.
  • state.savepoints.dir: The default directory for savepoints. Takes a path URI, similar to state.checkpoints.dir.

您可以直接在Flink作业或应用程序中的代码中配置复选指向。将这些值放在配置中,将将它们定义为默认值,以防应用程序没有配置任何东西。

state.backend:要使用的状态后端。这就定义了用于获取快照的数据结构机制。常见的值是文件系统或rocksdb。

state.checkpoints.dir:要写入检查点的目录。这需要一个路径URI,比如mybbup/闪烁应用/检查点或hdfs:端口、闪烁/检查点。

state.savepoints.dir:保存点的默认目录。采用路径URI,类似于state.checkpoints.dir。

Web UI

  • web.submit.enable: Enables uploading and starting jobs through the Flink UI (true by default). Please note that even when this is disabled, session clusters still accept jobs through REST requests (HTTP calls). This flag only guards the feature to upload jobs in the UI.
  • web.upload.dir: The directory where to store uploaded jobs. Only used when web.submit.enable is true.

web.submit.enable:允许通过FlinkUI上传和启动作业(默认情况下为真)。请注意,即使禁用了此操作,会话集群仍然通过REST请求(HTTP调用)接受作业。此标志仅保护要在UI中上传作业的功能。

web.upload.dir:存储上载作业的目录。仅在web.submit.enable为true时使用。

Other

io.tmp.dirs: The directories where Flink puts local data, defaults to the system temp directory (java.io.tmpdir property). If a list of directories is configured, Flink will rotate files across the directories.

The data put in these directories include by default the files created by RocksDB, spilled intermediate results (batch algorithms), and cached jar files.

This data is NOT relied upon for persistence/recovery, but if this data gets deleted, it typically causes a heavyweight recovery operation. It is hence recommended to set this to a directory that is not automatically periodically purged.

Yarn, Mesos, and Kubernetes setups automatically configure this value to the local working directories by default.

io.tmp.dirs:Flink放置本地数据的目录,默认为系统临时目录(java.io.tmpdir属性)。如果配置了目录列表,Flink将跨目录旋转文件。

默认情况下,这些目录中的数据包括RocksDB创建的文件、溢出的中间结果(批处理算法)和缓存的jar文件。

此数据不依赖于持久性/恢复,但如果此数据被删除,它通常会导致一个量级的恢复操作。因此,建议将其设置为不自动定期清除的目录。

默认情况下,纱线、台面和库伯网设置会自动将此值配置为本地工作目录

常见的设置选项

 容错(性)

这些配置选项控制Flink在执行期间发生失败时的重启行为。通过在flink-conf.yaml中配置这些选项,您可以定义集群的默认重新启动策略。

只有在没有配置特定于作业的重新启动策略并通过执行配置时,默认的重新启动策略才会生效。

Key

Default

Type

Description

restart-strategy

(none)

String

Defines the restart strategy to use in case of job failures.
Accepted values are:

  • none, off, disable: No restart strategy.
  • fixeddelay, fixed-delay: Fixed delay restart strategy. More details can be found here.
  • failurerate, failure-rate: Failure rate restart strategy. More details can be found here.

If checkpointing is disabled, the default value is none. If checkpointing is enabled, the default value is fixed-delay with Integer.MAX_VALUE restart attempts and '1 s' delay.

定义要在作业失败时使用的重新启动策略。已接受的值为:

无、关闭、禁用:无重启策略。

固定延迟,固定延迟:固定延迟重启策略。更多的细节可以在这里找到。

故障率、故障率:故障率重启策略。更多的细节可以在这里找到。

如果禁用了复选指向,则默认值为无。如果启用了选点,则默认值为整数固定延迟。MAX_VALUE重新启动尝试和“1”延迟。

Fixed Delay Restart Strategy

Key

Default

Type

Description

restart-strategy.fixed-delay.attempts

1

Integer

The number of times that Flink retries the execution before the job is declared as failed if restart-strategy has been set to fixed-delay.

如果将重新启动策略设置为固定延迟,则Flink在作业声明为失败之前重试执行的次数。

restart-strategy.fixed-delay.delay

1 s

Duration

Delay between two consecutive restart attempts if restart-strategy has been set to fixed-delay. Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted. It can be specified using notation: "1 min", "20 s"

如果将重启策略设置为固定延迟,则连续两次重启尝试之间的延迟。当程序与外部系统交互时,延迟重试会有帮助,例如,连接或挂起事务应该在尝试重新执行之前达到超时。可以使用“1分钟”、“20秒”等符号来指定

Failure Rate Restart Strategy

Key

Default

Type

Description

restart-strategy.failure-rate.delay

1 s

Duration

Delay between two consecutive restart attempts if restart-strategy has been set to failure-rate. It can be specified using notation: "1 min", "20 s"

如果已将重新启动策略设置为失败率,则连续两次重启尝试之间的延迟。可以使用“1分钟”、“20秒”等符号来指定

restart-strategy.failure-rate.failure-rate-interval

1 min

Duration

Time interval for measuring failure rate if restart-strategy has been set to failure-rate. It can be specified using notation: "1 min", "20 s"

如果已将重新启动策略设置为故障率,则测量故障率的时间间隔。可以使用“1分钟”、“20秒”等符号来指定

restart-strategy.failure-rate.max-failures-per-interval

1

Integer

Maximum number of restarts in given time interval before failing a job if restart-strategy has been set to failure-rate.

如果重启策略设置为失败率,在给定时间间隔内作业失败前重新启动的最大次数。

Checkpoints and State Backends

Key

Default

Type

Description

state.backend

(none)

String

The state backend to be used to store and checkpoint state.

要用于存储和检查点状态的状态后端。

state.checkpoints.dir

(none)

String

The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).

用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。存储路径必须可从所有参与的进程/节点(即所有任务管理器和作业经理)访问。

state.savepoints.dir

(none)

String

The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend).

保存点的默认目录。由将保存点写入文件系统的状态后端使用(内存状态后端、文件状态后端、RocksDBState后端)。

state.backend.incremental

false

Boolean

Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option.

尽可能选择状态后端是否应该创建增量检查点。对于增量检查点,只存储与前一个检查点不同的部分,而不是完整的检查点状态。启用后,WebUI中显示或从restAPI获取的状态大小只表示delta检查点大小,而不是完整的检查点大小。一些状态后端可能不支持增量检查点,并忽略此选项。

state.backend.local-recovery

false

Boolean

This option configures local recovery for this state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does not support local recovery and ignore this option.

此选项可为此状态后端配置本地恢复。默认情况下,将停用本地恢复。本地恢复当前只覆盖键控状态后端。目前,内存状态后端不支持本地恢复,并忽略此选项。

state.checkpoints.num-retained

1

Integer

The maximum number of completed checkpoints to retain.

要保留的最大已完成的检查点的数量

taskmanager.state.local.root-dirs

(none)

String

The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does not support local recovery and ignore this option

配置参数,定义用于存储基于文件的本地恢复状态的根目录。本地恢复当前只覆盖键控状态后端。目前,内存状态后端不支持本地恢复,并忽略此选项

High Availability

High-availability here refers to the ability of the JobManager process to recover from failures.

The JobManager ensures consistency during recovery across TaskManagers. For the JobManager itself to recover consistently, an external service must store a minimal amount of recovery metadata (like “ID of last committed checkpoint”), as well as help to elect and lock which JobManager is the leader (to avoid split-brain situations).

这里的高可用性是指作业管理器流程从故障中恢复的能力。

工作管理器可确保跨任务管理器在恢复期间的一致性。为了使作业管理器本身持续恢复,外部服务必须存储少量的恢复元数据(如“上次提交检查点的ID”),以及帮助选择和锁定作业管理器的领导者(以避免脑裂情况)。

Key

Default

Type

Description

high-availability

"NONE"

String

Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.

定义用于集群执行的高可用性模式。要启用高可用性,请将此模式设置为“动物园管理员”或指定工厂类的FQN。

high-availability.cluster-id

"/default"

String

The ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.

Flink集群的ID,用于将多个Flink集群相互分离。需要为独立的集群进行设置,但可以在YARN和Mesos中自动推断出来。

high-availability.storageDir

(none)

String

File system path (URI) where Flink persists metadata in high-availability setups.

Flink保留元数据的文件系统路径(URI)。

Options for high-availability setups with ZooKeeper

Key

Default

Type

Description

high-availability.zookeeper.path.root

"/flink"

String

The root path under which Flink stores its entries in ZooKeeper.

Flink在ZooKeeper中存储其条目的根路径。

high-availability.zookeeper.quorum

(none)

String

The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.

在使用ZooKeeper的高可用性模式下运行Flink时,要使用的ZooKeeper仲裁。

Memory Configuration

These configuration values control the way that TaskManagers and JobManagers use memory.

Flink tries to shield users as much as possible from the complexity of configuring the JVM for data-intensive processing. In most cases, users should only need to set the values taskmanager.memory.process.size or taskmanager.memory.flink.size (depending on how the setup), and possibly adjusting the ratio of JVM heap and Managed Memory via taskmanager.memory.managed.fraction. The other options below can be used for performance tuning and fixing memory related errors.

For a detailed explanation of how these options interact, see the documentation on TaskManager and JobManager memory configurations.

这些配置值控制着任务管理器和作业管理器使用内存的方式。

Flink试图尽可能多地保护用户免受配置JVM以进行数据密集型处理的复杂性的影响。在大多数情况下,用户应该只需要设置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取决于设置方式),并可能通过taskmanager.memory.managed.fraction调整JVM堆和管理内存的比例。下面的其他选项可用于性能调整和修复与内存相关的错误。

有关这些选项如何交互的详细说明,请参阅关于任务管理器和作业管理器内存配置的文档。

Key

Default

Type

Description

jobmanager.memory.enable-jvm-direct-memory-limit

false

Boolean

Whether to enable the JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize). The limit will be set to the value of 'jobmanager.memory.off-heap.size' option.

是否启用作业管理器进程的JVM直接内存限制(-XX:最大内存大小)。该限制将被设置为“jobmanager.memory.off-heap.size”选项的值。

jobmanager.memory.flink.size

(none)

MemorySize

Total Flink Memory size for the JobManager. This includes all the memory that a JobManager consumes, except for JVM Metaspace and JVM Overhead. It consists of JVM Heap Memory and Off-heap Memory. See also 'jobmanager.memory.process.size' for total process memory size configuration.

工作管理器的总快速链接内存大小。这包括工作管理器消耗的所有内存,除了JVM元空间和JVM开销。它由JVM堆内存和堆内存组成。有关总进程内存大小的配置,请参见“jobmanager.memory.process.size”。

jobmanager.memory.heap.size

(none)

MemorySize

JVM Heap Memory size for JobManager.

针对作业管理器的JVM堆的内存大小

jobmanager.memory.jvm-metaspace.size

256 mb

MemorySize

JVM Metaspace Size for the JobManager.

作业管理器的JVM元空间大小。

jobmanager.memory.jvm-overhead.fraction

0.1

Float

Fraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.

要为JVM开销保留的总进程内存的比例。这是为JVM开销保留的堆内存,如线程堆栈空间、编译缓存等。这包括本机内存,但不包括直接内存,当Flink计算JVM最大直接内存大小参数时将不会计数。JVM开销的大小将占总进程内存的配置部分。如果派生的大小小于或大于配置的最小或最大大小,则将使用最小或最大大小。可以通过将最小大小和最大大小设置为相同的值来显式地指定JVM开销的确切大小。

jobmanager.memory.jvm-overhead.max

1 gb

MemorySize

Max JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.

工作管理器的最大JVM开销大小。这是为JVM开销保留的堆内存,如线程堆栈空间、编译缓存等。这包括本机内存,但不包括直接内存,当Flink计算JVM最大直接内存大小参数时将不会计数。JVM开销的大小将占总进程内存的配置部分。如果派生的大小小于或大于配置的最小或最大大小,则将使用最小或最大大小。可以通过将最小大小和最大大小设置为相同的值来显式地指定JVM开销的确切大小。

jobmanager.memory.jvm-overhead.min

192 mb

MemorySize

Min JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.

工作管理器的最小JVM开销大小。这是为JVM开销保留的堆内存,如线程堆栈空间、编译缓存等。这包括本机内存,但不包括直接内存,当Flink计算JVM最大直接内存大小参数时将不会被计数。JVM开销的大小将占总进程内存的配置部分。如果派生的大小小于或大于配置的最小或最大大小,则将使用最小或最大大小。可以通过将最小大小和最大大小设置为相同的值来显式地指定JVM开销的确切大小。

jobmanager.memory.off-heap.size

128 mb

MemorySize

Off-heap Memory size for JobManager. This option covers all off-heap memory usage including direct and native memory allocation. The JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize) will be set to this value if the limit is enabled by 'jobmanager.memory.enable-jvm-direct-memory-limit'.

作业管理器的堆内存大小。此选项涵盖了所有堆外内存的使用情况,包括直接内存和本机内存的分配。如果‘jobmanager.memory.enable-jvm-direct-memory-limit’。启用了作业管理器进程的JVM直接内存限制(-XX:最大直接内存限制)将被设置为此值

jobmanager.memory.process.size

(none)

MemorySize

Total Process Memory size for the JobManager. This includes all the memory that a JobManager JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. In containerized setups, this should be set to the container memory. See also 'jobmanager.memory.flink.size' for Total Flink Memory size configuration.

作业管理器的总进程内存大小。这包括工作管理器JVM进程消耗的所有内存,包括总翻转内存、JVM元空间和JVM开销。在容器化的设置中,应该将其设置为容器内存。有关“jobmanager.memory.flink.size”内存大小配置,请参见“总”。

taskmanager.memory.flink.size

(none)

MemorySize

Total Flink Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory, Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Network Memory. See also 'taskmanager.memory.process.size' for total process memory size configuration.

任务执行器的总快速链接内存大小。这包括任务执行器消耗的所有内存,除了JVM元空间和JVM开销。它由框架堆内存、任务堆内存、任务离堆内存、管理内存和网络内存组成。有关总进程内存大小的配置,请参见“taskmanager.memory.process.size”。

taskmanager.memory.framework.heap.size

128 mb

MemorySize

Framework Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for TaskExecutor framework, which will not be allocated to task slots.

针对任务执行器的框架堆内存大小。这是为任务执行器框架保留的JVM堆内存的大小,它将不会分配给任务插槽。

taskmanager.memory.framework.off-heap.size

128 mb

MemorySize

Framework Off-Heap Memory size for TaskExecutors. This is the size of off-heap memory (JVM direct memory and native memory) reserved for TaskExecutor framework, which will not be allocated to task slots. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.

针对任务执行器的框架非堆堆内存大小。这是为任务执行器框架保留的堆外内存(JVM直接内存和本机内存)的大小,它将不会被分配给任务插槽。当Flink计算JVM最大直接内存大小参数时,将完全计算已配置的值。

taskmanager.memory.jvm-metaspace.size

256 mb

MemorySize

JVM Metaspace Size for the TaskExecutors.

任务执行者的JVM元空间大小。

taskmanager.memory.jvm-overhead.fraction

0.1

Float

Fraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.

要为JVM开销保留的总进程内存的比例。这是为JVM开销保留的堆内存,如线程堆栈空间、编译缓存等。这包括本机内存,但不包括直接内存,当Flink计算JVM最大直接内存大小参数时将不会计数。JVM开销的大小将占总进程内存的配置部分。如果派生的尺寸小于/大于配置的最小/最大尺寸,则将使用最小/最大尺寸。可以通过将最小/最大大小设置为相同的值来显式地指定JVM开销的确切大小。

taskmanager.memory.jvm-overhead.max

1 gb

MemorySize

Max JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.

任务执行者的最大JVM开销大小。这是为JVM开销保留的堆内存,如线程堆栈空间、编译缓存等。这包括本机内存,但不包括直接内存,当Flink计算JVM最大直接内存大小参数时将不会计数。JVM开销的大小将占总进程内存的配置部分。如果派生的尺寸小于/大于配置的最小/最大尺寸,则将使用最小/最大尺寸。可以通过将最小/最大大小设置为相同的值来显式地指定JVM开销的确切大小。

taskmanager.memory.jvm-overhead.min

192 mb

MemorySize

Min JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.

任务执行人的最小JVM开销大小。这是为JVM开销保留的堆内存,如线程堆栈空间、编译缓存等。这包括本机内存,但不包括直接内存,当Flink计算JVM最大直接内存大小参数时将不会计数。JVM开销的大小将占总进程内存的配置部分。如果派生的尺寸小于/大于配置的最小/最大尺寸,则将使用最小/最大尺寸。可以通过将最小/最大大小设置为相同的值来显式地指定JVM开销的确切大小。

taskmanager.memory.managed.consumer-weights

DATAPROC:70,PYTHON:30

Map

Managed memory weights for different kinds of consumers. A slot’s managed memory is shared by all kinds of consumers it contains, proportionally to the kinds’ weights and regardless of the number of consumers from each kind. Currently supported kinds of consumers are DATAPROC (for RocksDB state backend in streaming and built-in algorithms in batch) and PYTHON (for Python processes).

针对不同类型的消费者的管理内存权重。一个插槽的托管内存由它所包含的各种用户共享,这与类型的权重成比例,而不管每种类型的用户数量是多少。目前支持的消费者类型是数据进程(用于流媒体中的RocksDB状态后端和批处理中的内置算法)和Python(用于Python进程)。

taskmanager.memory.managed.fraction

0.4

Float

Fraction of Total Flink Memory to be used as Managed Memory, if Managed Memory size is not explicitly specified.

如果未显式指定管理内存大小,则将用作管理内存的总翻转内存的分数。

taskmanager.memory.managed.size

(none)

MemorySize

Managed Memory size for TaskExecutors. This is the size of off-heap memory managed by the memory manager, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.

针对任务执行器的托管内存大小。这是由内存管理器管理的堆外内存的大小,保留用于排序、散列表、中间结果的缓存和RocksDB状态后端。内存使用者可以以内存片段的形式从内存管理器分配内存,也可以从内存管理器保留字节,并将其内存使用量保持在该边界内。如果未指定,它将被派生为占总翻转内存的配置部分。

taskmanager.memory.network.fraction

0.1

Float

Fraction of Total Flink Memory to be used as Network Memory. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max size to the same value.

要用作网络内存的总翻转内存的分数。网络内存是为Shuffle环境保留的堆内存(例如网络缓冲区)。网络内存大小占总翻转内存的配置部分。如果派生的尺寸小于/大于配置的最小/最大尺寸,则将使用最小/最大尺寸。可以通过将最小大小/最大大小设置为相同的值,来显式地指定网络内存的确切大小。

taskmanager.memory.network.max

1 gb

MemorySize

Max Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.

任务执行器的最大网络内存大小。网络内存是为Shuffle环境保留的堆内存(例如网络缓冲区)。网络内存大小占总翻转内存的配置部分。如果派生的尺寸小于/大于配置的最小/最大尺寸,则将使用最小/最大尺寸。可以通过将最小值/最大值设置为相同的值,来显式地指定网络内存的确切大小。

taskmanager.memory.network.min

64 mb

MemorySize

Min Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.

任务执行器的最小网络内存大小。网络内存是为Shuffle环境保留的堆内存(例如网络缓冲区)。网络内存大小占总翻转内存的配置部分。如果派生的尺寸小于/大于配置的最小/最大尺寸,则将使用最小/最大尺寸。可以通过将最小值/最大值设置为相同的值来显式地指定网络内存的确切大小。

taskmanager.memory.process.size

(none)

MemorySize

Total Process Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On containerized setups, this should be set to the container memory. See also 'taskmanager.memory.flink.size' for total Flink memory size configuration.

任务执行器的总进程内存大小。这包括任务执行器消耗的所有内存,包括总翻转内存、JVM元空间和JVM开销。在容器化的设置上,应该将其设置为容器内存。有关总Flink内存大小的配置,请参见“taskmanager.memory.flink.size”。

taskmanager.memory.task.heap.size

(none)

MemorySize

Task Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for tasks. If not specified, it will be derived as Total Flink Memory minus Framework Heap Memory, Framework Off-Heap Memory, Task Off-Heap Memory, Managed Memory and Network Memory.

任务执行器的任务堆内存大小。这是为任务保留的JVM堆内存的大小。如果没有指定,它将被导出为总Flink内存减去框架堆内存、框架离堆内存、任务脱堆内存、管理内存和网络内存。

taskmanager.memory.task.off-heap.size

0 bytes

MemorySize

Task Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.

任务执行器的任务脱堆内存大小。这是为任务保留的离堆内存(JVM直接内存和本机内存)的大小。当Flink计算JVM最大直接内存大小参数时,将完全计算已配置的值。

Miscellaneous Options

Key

Default

Type

Description

fs.allowed-fallback-filesystems

(none)

String

A (semicolon-separated) list of file schemes, for which Hadoop can be used instead of an appropriate Flink plugin. (example: s3;wasb)

一个(分号分隔的)文件方案列表,可以使用Hadoop,而不是一个适当的Flink插件。(例如:s3、Wasb)

fs.default-scheme

(none)

String

The default filesystem scheme, used for paths that do not declare a scheme explicitly. May contain an authority, e.g. host:port in case of an HDFS NameNode.

默认的文件系统方案,用于不显式声明方案的路径。可能包含一个权限,例如:HDFS参数节点时的主机端口。

io.tmp.dirs

'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.

String

Directories for temporary files, separated by",", "|", or the system's java.io.File.pathSeparator.

临时文件的目录,由“、”、“|”或系统的java.io分隔。File.pathSeparator.

安全

配置Flink与外部系统的安全和安全交互的选项。

SSL

Flink’s network connections can be secured via SSL. Please refer to the SSL Setup Docs for detailed setup guide and background.

Key

Default

Type

Description

security.ssl.algorithms

"TLS_RSA_WITH_AES_128_CBC_SHA"

String

The comma separated list of standard SSL algorithms to be supported. Read more here

需要支持的标准SSL算法的逗号分隔列表。在这里阅读更多信息

security.ssl.internal.cert.fingerprint

(none)

String

The sha1 fingerprint of the internal certificate. This further protects the internal communication to present the exact certificate used by Flink.This is necessary where one cannot use private CA(self signed) or there is internal firm wide CA is required

内部证书的sha1指纹。这进一步保护了内部通信,以呈现Flink使用的确切证书。如果不能使用私有CA(自签名)或需要内部公司范围的CA,这是必要的

security.ssl.internal.enabled

false

Boolean

Turns on SSL for internal network communication. Optionally, specific components may override this through their own settings (rpc, data transport, REST, etc).

打开SSL以进行内部网络通信。或者,特定的组件可以通过它们自己的设置(rpc、数据传输、REST等)来覆盖它。

security.ssl.internal.key-password

(none)

String

The secret to decrypt the key in the keystore for Flink's internal endpoints (rpc, data transport, blob server).

解密Flink内部端点(rpc、数据传输、blob服务器)密钥存储中的密钥的秘密。

security.ssl.internal.keystore

(none)

String

The Java keystore file with SSL Key and Certificate, to be used Flink's internal endpoints (rpc, data transport, blob server).

带有SSL密钥和证书的Java密钥存储文件,将使用Flink的内部端点(rpc、数据传输、blob服务器)。

security.ssl.internal.keystore-password

(none)

String

The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server).

解密针对Flink的内部端点(rpc、数据传输、blob服务器)的Flink的密钥存储文件的秘密。

security.ssl.internal.truststore

(none)

String

The truststore file containing the public CA certificates to verify the peer for Flink's internal endpoints (rpc, data transport, blob server).

包含用于验证Flink内部端点(rpc、数据传输、blob服务器)的对等点的公共CA证书的信任存储文件。

security.ssl.internal.truststore-password

(none)

String

The password to decrypt the truststore for Flink's internal endpoints (rpc, data transport, blob server).

解密Flink内部端点(rpc、数据传输、blob服务器)的信任存储的密码。

security.ssl.protocol

"TLSv1.2"

String

The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.

SSl传输所支持的SSL协议版本。请注意,它不支持用逗号分隔的列表。

security.ssl.rest.authentication-enabled

false

Boolean

Turns on mutual SSL authentication for external communication via the REST endpoints.

通过REST端点为外部通信启用相互SSL身份验证。

security.ssl.rest.cert.fingerprint

(none)

String

The sha1 fingerprint of the rest certificate. This further protects the rest REST endpoints to present certificate which is only used by proxy serverThis is necessary where once uses public CA or internal firm wide CA

其余证书的sha1指纹。这进一步保护了其余的REST端点,该证书仅被代理服务器使用。一旦使用公共CA或内部公司范围的CA,这是必要的

security.ssl.rest.enabled

false

Boolean

Turns on SSL for external communication via the REST endpoints.

通过REST端点打开SSL以进行外部通信。

security.ssl.rest.key-password

(none)

String

The secret to decrypt the key in the keystore for Flink's external REST endpoints.

security.ssl.rest.keystore

(none)

String

The Java keystore file with SSL Key and Certificate, to be used Flink's external REST endpoints.

带有SSL密钥和证书的Java密钥存储文件,将使用Flink的外部REST端点。

security.ssl.rest.keystore-password

(none)

String

The secret to decrypt the keystore file for Flink's for Flink's external REST endpoints.

解密针对Flink的外部REST端点的FLink的密钥存储文件的秘密。

security.ssl.rest.truststore

(none)

String

The truststore file containing the public CA certificates to verify the peer for Flink's external REST endpoints.

包含用于验证Flink的外部REST端点的对等点的公共CA证书的信任存储文件。

security.ssl.rest.truststore-password

(none)

String

The password to decrypt the truststore for Flink's external REST endpoints.

用来解密Flink的外部REST端点的信任库的密码。

security.ssl.verify-hostname

true

Boolean

Flag to enable peer’s hostname verification during ssl handshake.

在ssl握手期间启用对等的主机名验证的标志。

Auth with External Systems 使用外部系统进行认证

ZooKeeper Authentication / Authorization

These options are necessary when connecting to a secured ZooKeeper quorum.

动物管理员的认证/授权

当连接到一个安全的动物园管理员仲裁时,这些选项是必要的。

Key

Default

Type

Description

zookeeper.sasl.disable

false

Boolean

zookeeper.sasl.login-context-name

"Client"

String

zookeeper.sasl.service-name

"zookeeper"

String

Kerberos-based Authentication / Authorization

Please refer to the Flink and Kerberos Docs for a setup guide and a list of external system to which Flink can authenticate itself via Kerberos.

Key

Default

Type

Description

security.kerberos.login.contexts

(none)

String

A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication)

以逗号分隔的登录上下文列表(例如,提供`kerberes凭据,使用动物守护者身份验证和卡夫卡身份验证的凭据)

security.kerberos.login.keytab

(none)

String

Absolute path to a Kerberos keytab file that contains the user credentials.

指向包含用户凭据的Kerberos密钥选项卡文件的绝对路径。

security.kerberos.login.principal

(none)

String

Kerberos principal name associated with the keytab.

与键选项卡相关联的Kerberos主体名称。

security.kerberos.login.use-ticket-cache

true

Boolean

Indicates whether to read from your Kerberos ticket cache.

指示是否从Kerberos票务缓存中读取。

Resource Orchestration Frameworks 资源编排框架

This section contains options related to integrating Flink with resource orchestration frameworks, like Kubernetes, Yarn, Mesos, etc.

Note that is not always necessary to integrate Flink with the resource orchestration framework. For example, you can easily deploy Flink applications on Kubernetes without Flink knowing that it runs on Kubernetes (and without specifying any of the Kubernetes config options here.) See this setup guide for an example.

The options in this section are necessary for setups where Flink itself actively requests and releases resources from the orchestrators.

本节包含与集成Flink和资源编排框架相关的选项,如库伯内特斯、纱线、台面等。

注意,将Flink与资源编排框架集成并不总是必要的。例如,您可以轻松地在库伯内e上部署Flink应用程序,而不知道Flink在库伯内e上运行(而且在这里不指定任何库伯内e配置选项)。有关一个示例,请参见本设置指南。

本节中的选项对于Flinc本身主动请求和从编排器释放资源的设置是必需的。

YARN

Key

Default

Type

Description

external-resource.<resource_name>.yarn.config-key

(none)

String

If configured, Flink will add this key to the resource profile of container request to Yarn. The value will be set to the value of external-resource.<resource_name>.amount.

如果配置,Flink将此键添加到Yarn的容器请求的资源配置文件。该值将被设置为external-resource.<resource_name>。amount的值。

yarn.application-attempt-failures-validity-interval

10000

Long

Time window in milliseconds which defines the number of application attempt failures when restarting the AM. Failures which fall outside of this window are not being considered. Set this value to -1 in order to count globally. See here for more information.

以毫秒为单位的时间窗口,它定义了重新启动AM时应用程序尝试失败的次数。不考虑位于此窗口之外的故障。将此值设置为-1,以进行全局计数。有关更多信息,请参阅这里。

yarn.application-attempts

(none)

String

Number of ApplicationMaster restarts. By default, the value will be set to 1. If high availability is enabled, then the default value will be 2. The restart number is also limited by YARN (configured via yarn.resourcemanager.am.max-attempts). Note that that the entire Flink cluster will restart and the YARN Client will lose the connection.

应用程序主程序重新启动数。默认情况下,该值将被设置为1。如果启用了高可用性,则默认值将为2。重启编号也受到YARN的限制(通过yarn.resourcemanager.am.max-attempts进行配置)。请注意,整个Flink集群将重新启动,YARN客户端将失去连接。

yarn.application-master.port

"0"

String

With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.

使用此配置选项,用户可以为应用程序主服务器(和作业管理器)RPC端口指定端口、端口范围或端口列表。默认情况下,我们建议使用默认值(0)来让操作系统选择一个合适的端口。特别是当多个AM在同一物理主机上运行时,固定端口分配会阻止AM启动。例如,当在具有限制性防火墙的环境上在YARN上运行Flink时,此选项允许指定允许的端口范围。

yarn.application.id

(none)

String

The YARN application id of the running yarn cluster. This is the YARN cluster where the pipeline is going to be executed.

正在运行的纱线集群的YARN应用程序iD。这是要执行管道的YARN集群。

yarn.application.name

(none)

String

A custom name for your YARN application.

一个针对YARN应用程序的自定义名称。

yarn.application.node-label

(none)

String

Specify YARN node label for the YARN application.

指定该YARN应用程序的YARN节点标签。

yarn.application.priority

-1

Integer

A non-negative integer indicating the priority for submitting a Flink YARN application. It will only take effect if YARN priority scheduling setting is enabled. Larger integer corresponds with higher priority. If priority is negative or set to '-1'(default), Flink will unset yarn priority setting and use cluster default priority. Please refer to YARN's official documentation for specific settings required to enable priority scheduling for the targeted YARN version.

表示提交FlinkYARN应用程序的优先级的非负整数。它只有在启用了年优先级调度设置时才会生效。整数越大,优先级就越高。如果优先级为负值或设置为“-1”(默认值),Flink将取消设置纱线优先级设置,并使用集群默认优先级。有关启用目标YARN版本的优先级调度所需的特定设置,请参考YARN的官方文档。

yarn.application.queue

(none)

String

The YARN queue on which to put the current pipeline.

将当前管道放置在的YARN队列

yarn.application.type

(none)

String

A custom type for your YARN application..

一个针对YARN应用程序的自定义类型。

yarn.appmaster.vcores

1

Integer

The number of virtual cores (vcores) used by YARN application master.

YARN应用程序主服务器使用的虚拟内核(vcore)数。

yarn.containers.vcores

-1

Integer

The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.

每个YARN容器的虚拟核心数(vcore)。默认情况下,vcore数设置为每个任务管理器的插槽数,如果设置为1,则设置为1。为了使用此参数,您的集群必须启用CPU调度。您可以通过设置org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.来实现这一点费尔调度器。

yarn.file-replication

-1

Integer

Number of file replication of each local resource file. If it is not configured, Flink will use the default replication value in hadoop configuration.

每个本地资源文件的文件复制数。如果未配置,Flink将在hadoop配置中使用默认的复制值。

yarn.flink-dist-jar

(none)

String

The location of the Flink dist jar.

yarn.heartbeat.container-request-interval

500

Integer

Time between heartbeats with the ResourceManager in milliseconds if Flink requests containers:

  • The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats.
  • The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn.

If you observe too many container allocations on the ResourceManager, then it is recommended to increase this value. See this link for more information.

如果Flink请求容器,则与资源管理器的心跳之间的时间以毫秒为单位:

这个值越低,Flink就越快收到容器分配,因为请求和分配是通过心跳传输的。

这个值越低,可能分配的容器就越多,这些容器最终会被释放,但会给纱线带来压力。

如果您在资源管理器上观察到太多的容器分配,则建议增加此值。有关更多信息,请参见此链接。

yarn.heartbeat.interval

5

Integer

Time between heartbeats with the ResourceManager in seconds.

与资源管理器心跳之间的时间为秒。

yarn.per-job-cluster.include-user-jar

"ORDER"

String

Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER"). "DISABLED" means the user-jars are excluded from the system class path.

定义用户罐是否包含在每个作业集群的系统类路径中,以及它们在路径中的位置。它们可以定位在开头(“第一”)、结尾(“LAST”),或者根据它们的名称(“顺序”)进行定位。“禁用”意味着用户罐被排除在系统类路径之外。

yarn.properties-file.location

(none)

String

When a Flink job is submitted to YARN, the JobManager’s host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users).

当Flink作业提交给YARN时,JobManger的主机和可用处理插槽的数量将被写入属性文件,以便Flink客户端能够获取这些详细信息。此配置参数允许更改该文件的默认位置(例如,在用户之间共享Flink安装的环境)。

yarn.provided.lib.dirs

(none)

List<String>

A semicolon-separated list of provided lib directories. They should be pre-uploaded and world-readable. Flink will use them to exclude the local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the job submission process. Also YARN will cache them on the nodes so that they doesn't need to be downloaded every time for each application. An example could be hdfs://$namenode_address/path/of/flink/lib

所提供的库目录的以分号分隔的列表。它们应该是预先上传的和世界可读的。Flink将使用它们来排除本地的Flink罐子(例如Flink-dist、lib/、插件/)的上传,以加速作业提交过程。YARN还会将它们缓存在节点上,这样就不需要每次为每个应用程序下载它们了。一个例子可以是hdfs://$namenode_address/path/of/flink/lib

yarn.security.kerberos.additionalFileSystems

(none)

List<String>

A comma-separated list of additional Kerberos-secured Hadoop filesystems Flink is going to access. For example, yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. The client submitting to YARN needs to have access to these file systems to retrieve the security tokens.

将访问逗号分隔的其他kerberos安全的Hadoop文件系统的逗号分隔列表。例如,yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.提交给YARN的客户端需要能够访问这些文件系统来检索安全令牌。

yarn.security.kerberos.localized-keytab-path

"krb5.keytab"

String

Local (on NodeManager) path where kerberos keytab file will be localized to. If yarn.security.kerberos.ship-local-keytab set to true, Flink willl ship the keytab file as a YARN local resource. In this case, the path is relative to the local resource directory. If set to false, Flink will try to directly locate the keytab from the path itself.

本地(节点管理器文件将本地化的节点管理器)路径。如果yarn.security.kerberos.ship-local-keytab设置为true,Flink将把键选项卡文件作为YARN本地资源发布。在这种情况下,路径相对于本地资源目录。如果设置为false,Flink将尝试直接从路径本身找到键选项卡。

yarn.security.kerberos.ship-local-keytab

true

Boolean

When this is true Flink will ship the keytab file configured via security.kerberos.login.keytab as a localized YARN resource.

当这是真的时,Flink将发布通过security.kerberos.login.keytab配置为本地化码资源的键选项卡文件。

yarn.ship-archives

(none)

List<String>

A semicolon-separated list of archives to be shipped to the YARN cluster. These archives will be un-packed when localizing and they can be any of the following types: ".tar.gz", ".tar", ".tgz", ".dst", ".jar", ".zip".

要发送到YARN集群的以分号分隔的档案列表。在本地化时,这些档案将被解打包,它们可以是以下任何一种类型:“。tar.gz”、“。tar”、“。tgz”、“。dst”、“。jar”、“。zip”。

yarn.ship-files

(none)

List<String>

A semicolon-separated list of files and/or directories to be shipped to the YARN cluster.

要发送到YARN集群的文件和/或目录的以分号分隔的列表。

yarn.staging-directory

(none)

String

Staging directory used to store YARN files while submitting applications. Per default, it uses the home directory of the configured file system.

提交应用程序时存储日元文件的分期目录。默认情况下,它使用配置文件系统的主目录。

yarn.tags

(none)

String

A comma-separated list of tags to apply to the Flink YARN application.

要应用于FlinkYARN应用程序的以逗号分隔的标记列表。

状态后端

RocksDB State Backend

这些都是配置RocksDB状态后端通常需要的选项。有关高级低级配置和故障排除所需的选项,请参见高级RocksDB后端部分。

Key

Default

Type

Description

state.backend.rocksdb.memory.fixed-per-slot

(none)

MemorySize

The fixed total amount of memory, shared among all RocksDB instances per slot. This option overrides the 'state.backend.rocksdb.memory.managed' option when configured. If neither this option, nor the 'state.backend.rocksdb.memory.managed' optionare set, then each RocksDB column family state has its own memory caches (as controlled by the column family options).

固定的内存总量,在每个插槽中的所有RocksDB实例之间共享的。此选项在配置时将覆盖“state.backend.rocksdb.memory.managed”选项。如果既没有设置此选项,也没有设置“state.backend.rocksdb.memory.managed”选项,那么每个RocksDB列族状态都有自己的内存缓存(由列族选项控制)。

state.backend.rocksdb.memory.high-prio-pool-ratio

0.1

Double

The fraction of cache memory that is reserved for high-priority data like index, filter, and compression dictionary blocks. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.

为索引、过滤器和压缩字典块等高优先级数据保留的缓存内存的比例。此选项仅在配置了“state.backend.rocksdb.memory.managed”或“state.backend.rocksdb.memory.fixed-per-slot”时才有效果。

state.backend.rocksdb.memory.managed

true

Boolean

If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.

state.backend.rocksdb.memory.write-buffer-ratio

0.5

Double

The maximum amount of memory that write buffers may take, as a fraction of the total shared memory. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.

写入缓冲区可能占用的最大内存量,占总共享内存的一部分。此选项仅在配置了“state.backend.rocksdb.memory.managed”或“state.backend.rocksdb.memory.fixed-per-slot”时才有效果。

如果设置,RocksDB状态后端将自动配置为使用任务插槽的管理内存预算,并将内存划分为写入缓冲区、索引、块缓存等。这样,RocksDB内存的三个主要用途就将受到限制。

state.backend.rocksdb.timer-service.factory

ROCKSDB

Enum

Possible values: [HEAP, ROCKSDB]

This determines the factory for timer service state implementation. Options are either HEAP (heap-based) or ROCKSDB for an implementation based on RocksDB.

这决定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项是HEAP(基于堆)或Rocksdb。

Metrics

Please refer to the metrics system documentation for background on Flink’s metrics infrastructure.

Key

Default

Type

Description

metrics.fetcher.update-interval

10000

Long

Update interval for the metric fetcher used by the web UI in milliseconds. Decrease this value for faster updating metrics. Increase this value if the metric fetcher causes too much load. Setting this value to 0 disables the metric fetching completely.

webUI使用的度量获取器的更新间隔为毫秒。降低此值以获得更快的更新指标。如果度量获取器导致过多的负载,则增加此值。将此值设置为0将完全禁用度量抓取。

metrics.internal.query-service.port

"0"

String

The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.

用于Flink的内部度量查询服务的端口范围。接受端口列表(“50100、50101”)、范围(“50100-50200”)或两者的组合。建议设置一系列端口,以避免在同一台机器上运行多个Flink组件时发生碰撞。每个默认的Flink将选择一个随机端口。

metrics.internal.query-service.thread-priority

1

Integer

The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.

用于Flink的内部度量查询服务的线程优先级。该线程是由Akka的线程池执行器创建的。优先级的范围是从1(MIN_PRIORITY)到10(MAX_PRIORITY)。警告,增加此值可能会导致主Flink组件关闭。

metrics.latency.granularity

"operator"

String

Defines the granularity of latency metrics. Accepted values are:

  • single - Track latency without differentiating between sources and subtasks.
  • operator - Track latency while differentiating between sources, but not subtasks.
  • subtask - Track latency while differentiating between sources and subtasks.

定义延迟度量的粒度。已接受的值为:

不区分源任务和子任务的单个跟踪延迟。

操作员-跟踪延迟,同时区分源,而不是子任务。

子任务-在区分源任务和子任务时跟踪延迟。

metrics.latency.history-size

128

Integer

Defines the number of measured latencies to maintain at each operator.

定义每个操作员要维护的测量延迟数。

metrics.latency.interval

0

Long

Defines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster.

定义从源源发出延迟跟踪标记的时间间隔。如果设置为0或为负值,则禁用延迟跟踪。启用此特性可能会显著影响集群的性能。

metrics.reporter.<name>.<parameter>

(none)

String

Configures the parameter <parameter> for the reporter named <name>.

为名为<name>的报告器配置参数<参数>。

metrics.reporter.<name>.class

(none)

String

The reporter class to use for the reporter named <name>.

用于<的名为>的记者类。

metrics.reporter.<name>.interval

10 s

Duration

The reporter interval to use for the reporter named <name>.

用于名为<名称>的报告器时间间隔。

metrics.reporters

(none)

String

An optional list of reporter names. If configured, only reporters whose name matches any of the names in the list will be started. Otherwise, all reporters that could be found in the configuration will be started.

一个可选的记者姓名列表。如果配置,将只启动名称与列表中任何名称匹配的记者。否则,将启动在配置中可以找到的所有记者。

metrics.scope.delimiter

"."

String

Delimiter used to assemble the metric identifier.

metrics.scope.jm

"<host>.jobmanager"

String

Defines the scope format string that is applied to all metrics scoped to a JobManager.

定义应用于指向Job管理器的所有度量的范围格式字符串。

用于组装度量标识符的分隔符。

metrics.scope.jm.job

"<host>.jobmanager.<job_name>"

String

Defines the scope format string that is applied to all metrics scoped to a job on a JobManager.

定义应用于作业管理器上作业的所有度量的范围格式字符串。

metrics.scope.operator

"<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"

String

Defines the scope format string that is applied to all metrics scoped to an operator.

定义应用于指向操作符的所有度量的范围格式字符串。

metrics.scope.task

"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"

String

Defines the scope format string that is applied to all metrics scoped to a task.

定义应用于任务范围内的所有度量的范围格式字符串。

metrics.scope.tm

"<host>.taskmanager.<tm_id>"

String

Defines the scope format string that is applied to all metrics scoped to a TaskManager.

定义应用于任务管理器范围内的所有度量的范围格式字符串。

metrics.scope.tm.job

"<host>.taskmanager.<tm_id>.<job_name>"

String

Defines the scope format string that is applied to all metrics scoped to a job on a TaskManager.

定义应用于任务管理器上作业的所有度量的作用域格式字符串。

metrics.system-resource

false

Boolean

Flag indicating whether Flink should report system resource metrics such as machine's CPU, memory or network usage.

标志,指示Flink是否应该报告系统资源指标,如计算机的CPU、内存或网络使用情况。

metrics.system-resource-probing-interval

5000

Long

Interval between probing of system resource metrics specified in milliseconds. Has an effect only when 'metrics.system-resource' is enabled.

探测以毫秒为单位指定的系统资源度量之间的时间间隔。只有在启用了“metrics.system-resource”时才会产生效果。

调试和专家调整

下面的选项是为专家用户和修复/调试问题准备的。大多数设置不应该需要配置这些选项。

Class Loading 类加载

Flink dynamically loads the code for jobs submitted to a session cluster. In addition, Flink tries to hide many dependencies in the classpath from the application. This helps to reduce dependency conflicts between the application code and the dependencies in the classpath.

Please refer to the Debugging Classloading Docs for details.

Flink会动态地加载提交到会话集群的作业的代码。此外,Flink还试图从应用程序中隐藏类路径中的许多依赖项。这有助于减少应用程序代码和类路径中的依赖项之间的依赖关系冲突。

详情请参阅调试类加载文档。

Key

Default

Type

Description

classloader.check-leaked-classloader

true

Boolean

Fails attempts at loading classes if the user classloader of a job is used after it has terminated. This is usually caused by the classloader being leaked by lingering threads or misbehaving libraries, which may also result in the classloader being used by other jobs. This check should only be disabled if such a leak prevents further jobs from running.

如果作业的用户类加载程序在其终止后使用了该类,则尝试加载类失败。这通常是由于类加载程序被遗留的线程或行为不当的库泄漏所致,这也可能导致类加载程序被其他作业使用。只有在此类泄漏阻止其他作业运行时,才应禁用此检查。

classloader.fail-on-metaspace-oom-error

true

Boolean

Fail Flink JVM processes if 'OutOfMemoryError: Metaspace' is thrown while trying to load a user code class.

如果在试图加载用户代码类时抛出了“超出内存错误:元空间”,则FlinkJVM进程失败。

classloader.parent-first-patterns.additional

(none)

String

A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to "classloader.parent-first-patterns.default".

(分号分隔的模式列表,指定应该首先通过父类加载器解析哪些类。模式是一个与完全限定的类名进行检查的简单前缀。这些模式被附加到“classloader.parent-first-patterns.default”中。

classloader.parent-first-patterns.default

"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback;org.xml;javax.xml;org.apache.xerces;org.w3c"

String

A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. To add another pattern we recommend to use "classloader.parent-first-patterns.additional" instead.(分号分隔的模式列表,指定应该首先通过父类加载器解析哪些类。模式是一个与完全限定的类名进行检查的简单前缀。通常不应该修改此设置。要添加另一种模式,我们建议使用“classloader.parent-first-patterns.additional”来代替。

classloader.resolve-order

"child-first"

String

Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).

在从用户代码加载类时定义类解析策略,这意味着是首先检查用户代码罐(“子优先”)还是应用程序类路径(“父优先”)。默认设置表示首先从用户代码jar加载类,这意味着用户代码罐可以包含和加载与Flink使用的不同的依赖项(过渡)。

调试的高级选项

Key

Default

Type

Description

jmx.server.port

(none)

String

The port range for the JMX server to start the registry. The port config can be a single port: "9123", a range of ports: "50100-50200", or a list of ranges and ports: "50100-50200,50300-50400,51234".
This option overrides metrics.reporter.*.port option.

JMX服务器启动注册表的端口范围。端口配置可以是单个端口:“9123”,端口范围:“50100-50200”,或范围和端口列表:“50100-50200、50300-50400、51234”。此选项将覆盖metrics.reporter。*。端口选项。

高级状态选项

Key

Default

Type

Description

state.backend.async

true

Boolean

Option whether the state backend should use an asynchronous snapshot method where possible and configurable. Some state backends may not support asynchronous snapshots, or only support asynchronous snapshots, and ignore this option.

选择状态后端是否应该在可能和可配置地使用异步快照方法。某些状态后端可能不支持异步快照,或只支持异步快照,并忽略此选项。

state.backend.fs.memory-threshold

20 kb

MemorySize

The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.状态数据文件的最小大小。所有小于这些状态块的状态块都内联存储在根检查点元数据文件中。此配置的最大内存阈值为1MB。

state.backend.fs.write-buffer-size

4096

Integer

The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option 'state.backend.fs.memory-threshold'.

为写入文件系统的检查点流提供的写缓冲区的默认大小。实际写缓冲区大小被确定为此选项和选项“state.backend.fs.memory-threshold”值的最大值。

高级RocksDB状态后端选项

Advanced options to tune RocksDB and RocksDB checkpoints.

Key

Default

Type

Description

state.backend.rocksdb.checkpoint.transfer.thread.num

1

Integer

The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.

用于在RocksDB状态后端中传输(下载和上载)文件的线程数(每个有状态操作符)。

state.backend.rocksdb.localdir

(none)

String

The local directory (on the TaskManager) where RocksDB puts its files.

RocksDB放置其文件的本地目录(在任务管理器上)。

state.backend.rocksdb.options-factory

"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"

String

The options factory class for RocksDB to create DBOptions and ColumnFamilyOptions. The default options factory is org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and it would read the configured options which provided in 'RocksDBConfigurableOptions'.

为RocksDB创建db选项和列成员选项的选项工厂类。默认的选项工厂是org.apache.flink.contrib.streaming.state。默认可配置选项工厂,它将读取在‘RocksDBConfigurableOptions’。中提供的配置选项

state.backend.rocksdb.predefined-options

"DEFAULT"

String

The predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. Current supported candidate predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED, SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user customized options and options from the RocksDBOptionsFactory are applied on top of these predefined ones.

由Flink社区提供的RocksDBDB选项和列家庭选项的预定义设置。当前支持的候选预定义选项是默认的、SPINNING_DISK_OPTIMIZED、SPINNING_DISK_OPTIMIZED_HIGH_MEM或FLASH_SSD_OPTIMIZED。请注意,用户从Rocksdbops工厂中获得的自定义选项和选项将应用在这些预定义的选项之上。

RocksDB Configurable Options RocksDB可配置选项

These options give fine-grained control over the behavior and resoures of ColumnFamilies. With the introduction of state.backend.rocksdb.memory.managed and state.backend.rocksdb.memory.fixed-per-slot (Apache Flink 1.10), it should be only necessary to use the options here for advanced performance tuning. These options here can also be specified in the application program via RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory).

这些选项提供了对列族的行为和返回的细粒度控制。随着state.backend.rocksdb.memory.managed和state.backend.rocksdb.memory.fixed-per-slot(ApacheFlink1.10)的引入,应该只需要使用这里的选项来进行高级性能调优。这里的这些选项也可以通过RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory)。在应用程序中指定

Key

Default

Type

Description

state.backend.rocksdb.block.blocksize

(none)

MemorySize

The approximate size (in bytes) of user data packed per block. RocksDB has default blocksize as '4KB'.

每个块打包的用户数据的近似大小(以字节为单位)。RocksDB的默认块大小为“4KB”

state.backend.rocksdb.block.cache-size

(none)

MemorySize

The amount of the cache for data blocks in RocksDB. RocksDB has default block-cache size as '8MB'.

RocksDB中的数据块的缓存量。RocksDB的默认块缓存大小为“8MB”。

state.backend.rocksdb.compaction.level.max-size-level-base

(none)

MemorySize

The upper-bound of the total size of level base files in bytes. RocksDB has default configuration as '256MB'.

以字节为单位的级别基础文件总大小的上限。RocksDB的默认配置为“256MB”。

state.backend.rocksdb.compaction.level.target-file-size-base

(none)

MemorySize

The target file size for compaction, which determines a level-1 file size. RocksDB has default configuration as '64MB'.

用于压缩的目标文件的大小,它决定了1级文件的大小。RocksDB的默认配置为“64MB”

state.backend.rocksdb.compaction.level.use-dynamic-size

(none)

Boolean

If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. RocksDB has default configuration as 'false'. For more information, please refer to RocksDB's doc.

如果为真,RocksDB将动态选择每个级别的目标大小。从一个空的数据库中,RocksDB将使最后一个级别作为底层,这意味着将L0数据合并到最后一个级别,直到它超过max_bytes_for_level_base。然后重复这个过程进入最后一级,等等。RocksDB的默认配置为“false”。有关更多信息,请参阅RocksDB的文档

state.backend.rocksdb.compaction.style

(none)

Enum

Possible values: [LEVEL, UNIVERSAL, FIFO]

The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO or UNIVERSAL, and RocksDB choose 'LEVEL' as default style.

为数据库指定的压缩样式。候选压缩样式是级别、FIFO或通用样式,RocksDB选择“级别”作为默认样式。

state.backend.rocksdb.files.open

(none)

Integer

The maximum number of open files (per TaskManager) that can be used by the DB, '-1' means no limit. RocksDB has default configuration as '-1'.

数据库可使用的最大打开文件数(每个任务管理器),“-1”表示没有限制。RocksDB的默认配置为“-1”。

state.backend.rocksdb.thread.num

(none)

Integer

The maximum number of concurrent background flush and compaction jobs (per TaskManager). RocksDB has default configuration as '1'.

并发后台刷新和压缩作业的最大数量(每个任务管理器)。RocksDB的默认配置为“1”。

state.backend.rocksdb.write-batch-size

2 mb

MemorySize

The max size of the consumed memory for RocksDB batch write, will flush just based on item count if this config set to 0.

如果此配置设置为0,RocksDB批处理写入的所用内存的最大大小将仅根据项目计数进行刷新

state.backend.rocksdb.writebuffer.count

(none)

Integer

The maximum number of write buffers that are built up in memory. RocksDB has default configuration as '2'.

在内存中建立的最大写缓冲区数。RocksDB的默认配置为“2”。

state.backend.rocksdb.writebuffer.number-to-merge

(none)

Integer

The minimum number of write buffers that will be merged together before writing to storage. RocksDB has default configuration as '1'.

在写入存储之前将合并在一起的最小写入缓冲区数。RocksDB的默认配置为“1”。

state.backend.rocksdb.writebuffer.size

(none)

MemorySize

The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. RocksDB has default writebuffer size as '64MB'.

在转换为已排序的磁盘上文件之前,在内存中建立的数据量(由磁盘上未排序的日志支持)。RocksDB的默认写缓冲区大小为“64MB”

Advanced Fault Tolerance Options 高级故障容忍度选项

These parameters can help with problems related to failover and to components erroneously considering each other as failed.

这些参数可以帮助解决与故障转移和错误地相互认为是失败的组件相关的问题。

Key

Default

Type

Description

cluster.io-pool.size

(none)

Integer

The size of the IO executor pool used by the cluster to execute blocking IO operations (Master as well as TaskManager processes). By default it will use 4 * the number of CPU cores (hardware contexts) that the cluster process has access to. Increasing the pool size allows to run more IO operations concurrently.

集群用于执行阻塞IO操作(主进程和任务管理器进程)的IO执行器池的大小。默认情况下,它将使用4*集群进程可以访问的CPU核心(硬件上下文)的数量。增加池的大小允许同时运行更多的IO操作。

cluster.registration.error-delay

10000

Long

The pause made after an registration attempt caused an exception (other than timeout) in milliseconds.

注册尝试后进行的暂停会在毫秒内导致异常(超时除外)

cluster.registration.initial-timeout

100

Long

Initial registration timeout between cluster components in milliseconds.

集群组件之间的初始注册超时,以毫秒为单位。

cluster.registration.max-timeout

30000

Long

Maximum registration timeout between cluster components in milliseconds.

集群组件之间的最大注册超时,单位为毫秒。

cluster.registration.refused-registration-delay

30000

Long

The pause made after the registration attempt was refused in milliseconds.

在注册尝试后进行的暂停在毫秒内被拒绝。

cluster.services.shutdown-timeout

30000

Long

The shutdown timeout for cluster services like executors in milliseconds.

诸如执行器等集群服务的关闭超时,单位为毫秒。

heartbeat.interval

10000

Long

Time interval for requesting heartbeat from sender side.

从发送方端请求心跳的时间间隔。

heartbeat.timeout

50000

Long

Timeout for requesting and receiving heartbeat for both sender and receiver sides.

为发送方和接收方两侧请求和接收心跳的超时。

jobmanager.execution.failover-strategy

"region"

String

This option specifies how the job computation recovers from task failures. Accepted values are:

  • 'full': Restarts all tasks to recover the job.
  • 'region': Restarts all tasks that could be affected by the task failure. More details can be found here.

此选项指定作业计算如何从任务失败中恢复。已接受的值为:

“满”:重新启动所有任务以恢复作业。

“区域”:重新启动可能受到任务失败影响的所有任务。更多的细节可以在这里找到。

Advanced Cluster Options

Key

Default

Type

Description

cluster.processes.halt-on-fatal-error

false

Boolean

Whether processes should halt on fatal errors instead of performing a graceful shutdown. In some environments (e.g. Java 8 with the G1 garbage collector), a regular graceful shutdown can lead to a JVM deadlock. See FLINK-16510 for details.

进程是否应该在致命错误时停止,而不是执行优雅的关闭。在某些环境中(例如带有G1垃圾收集器的Java8),定期优雅的优雅关闭会导致JVM死锁。详情请参见FLINK-16510。

Advanced Scheduling Options 高级计划选项

这些参数可以帮助针对特定情况进行微调调度。

Key

Default

Type

Description

cluster.evenly-spread-out-slots

false

Boolean

Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available TaskExecutors.

启用插槽扩展分配策略。该策略试图将插槽均匀地分配到所有可用的任务执行者中。

slot.idle.timeout

50000

Long

The timeout in milliseconds for a idle slot in Slot Pool.

插槽池中的空闲插槽的超时时间,以毫秒为单位。

slot.request.timeout

300000

Long

The timeout in milliseconds for requesting a slot from Slot Pool.

从插槽池请求插槽的超时时间,以毫秒为单位。

slotmanager.number-of-slots.max

2147483647

Integer

Defines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.

定义Flink集群分配的最大插槽数。此配置选项旨在限制批处理工作负载的资源消耗。不建议为流媒体工作负载配置此选项,如果没有足够的插槽,它可能会失败。请注意,此配置选项对独立集群无效,其中分配了多少插槽不由Flink控制。

高级的高可用性选项

Key

Default

Type

Description

high-availability.jobmanager.port

"0"

String

The port (range) used by the Flink Master for its RPC connections in highly-available setups. In highly-available setups, this value is used instead of 'jobmanager.rpc.port'.A value of '0' means that a random free port is chosen. TaskManagers discover this port through the high-availability services (leader election), so a random port or a port range works without requiring any additional means of service discovery.

Flink主服务器在高可用性设置中用于RPC连接的端口(范围)。在高可用性的设置中,会使用这个值来代替“jobmanager.rpc.port”。值“0”表示选择了一个随机的*端口。任务管理器通过高可用性服务(领导者选择)发现此端口,因此随机端口或端口范围可以工作,而不需要任何额外的服务发现方法。

Advanced High-availability ZooKeeper Options 高级的高可用性的ZooKeeper选项

Key

Default

Type

Description

high-availability.zookeeper.client.acl

"open"

String

Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).

定义要在ZK节点上配置的ACL(打开的|创建者)。如果ZooKeeper服务器配置将“授权提供者”属性映射以使用SASLAuthenticationProvider,并且集群配置为以安全模式(Kerbereros)运行,则配置值可以设置为“创建者”。

high-availability.zookeeper.client.connection-timeout

15000

Integer

Defines the connection timeout for ZooKeeper in ms.

在ms内定义ZooKeeper的连接超时。

high-availability.zookeeper.client.max-retry-attempts

3

Integer

Defines the number of connection retries before the client gives up.

定义在客户端放弃之前的连接重试次数。

high-availability.zookeeper.client.retry-wait

5000

Integer

Defines the pause between consecutive retries in ms.

定义在ms中连续重试之间的暂停。

high-availability.zookeeper.client.session-timeout

60000

Integer

Defines the session timeout for the ZooKeeper session in ms.

以ms为单位定义ZooKeeper会话的会话超时。

high-availability.zookeeper.path.checkpoint-counter

"/checkpoint-counter"

String

ZooKeeper root path (ZNode) for checkpoint counters.

检查点计数器的ZooKeeper根路径(ZNode)。

high-availability.zookeeper.path.checkpoints

"/checkpoints"

String

ZooKeeper root path (ZNode) for completed checkpoints.

已完成的检查点的ZooKeeper根路径(ZNode)。

high-availability.zookeeper.path.jobgraphs

"/jobgraphs"

String

ZooKeeper root path (ZNode) for job graphs

作业图的ZooKeeper根路径(ZNode)

high-availability.zookeeper.path.latch

"/leaderlatch"

String

Defines the znode of the leader latch which is used to elect the leader.

定义用于选择领导者的领导者锁存的znode。

high-availability.zookeeper.path.leader

"/leader"

String

Defines the znode of the leader which contains the URL to the leader and the current leader session ID.

定义包含URL的领导的znode和当前领导会话ID。

high-availability.zookeeper.path.mesos-workers

"/mesos-workers"

String

The ZooKeeper root path for persisting the Mesos worker information.

用于持久化Mesos工作信息的ZooKeeper根路径。

high-availability.zookeeper.path.running-registry

"/running_job_registry/"

String

Advanced Options for the REST endpoint and Client 针对REST端点和客户端的高级选项

Key

Default

Type

Description

rest.await-leader-timeout

30000

Long

The time in ms that the client waits for the leader address, e.g., Dispatcher or WebMonitorEndpoint

客户机等待领导地址的时间,例如,调度程序或网络监控或端点

rest.client.max-content-length

104857600

Integer

The maximum content length in bytes that the client will handle.

客户端将处理的最大字节内容长度。

rest.connection-timeout

15000

Long

The maximum time in ms for the client to establish a TCP connection.

客户端建立TCP连接的最大时间为ms

rest.idleness-timeout

300000

Long

The maximum time in ms for a connection to stay idle before failing.

连接在失败前保持空闲状态的最大时间,以毫秒为单位。

rest.retry.delay

3000

Long

The time in ms that the client waits between retries (See also `rest.retry.max-attempts`).

客户端在重试之间等待的毫秒时间(请参见`rest.retry.max-attempts`)。

rest.retry.max-attempts

20

Integer

The number of retries the client will attempt if a retryable operations fails.

如果可重试操作失败,客户端将尝试的重试次数。

rest.server.max-content-length

104857600

Integer

The maximum content length in bytes that the server will handle.

服务器将处理的最大内容长度为字节。

rest.server.numThreads

4

Integer

The number of threads for the asynchronous processing of requests.

用于异步处理请求的线程数。

rest.server.thread-priority

5

Integer

Thread priority of the REST server's executor for processing asynchronous requests. Lowering the thread priority will give Flink's main components more CPU time whereas increasing will allocate more time for the REST server's processing.

REST服务器的执行器用来处理异步请求的线程优先级。降低线程优先级将给Flink的主要组件更多的CPU时间,而增加将为REST服务器的处理分配更多的时间。

Advanced Options for Flink Web UI FlinkWebUI的高级选项

Key

Default

Type

Description

web.access-control-allow-origin

"*"

String

Access-Control-Allow-Origin header for all responses from the web-frontend.

访问控制-允许来自网络前端的所有响应的起源标题。

web.backpressure.cleanup-interval

600000

Integer

Time, in milliseconds, after which cached stats are cleaned up if not accessed.

时间,以毫秒为单位,之后如果不访问缓存,则清理缓存统计数据。

web.backpressure.delay-between-samples

50

Integer

Delay between samples to determine back pressure in milliseconds.

样品之间的延迟,以毫秒为单位确定背压力。

web.backpressure.num-samples

100

Integer

Number of samples to take to determine back pressure.

要确定反向压力的样品数量。

web.backpressure.refresh-interval

60000

Integer

Time, in milliseconds, after which available stats are deprecated and need to be refreshed (by resampling).

时间,以毫秒为单位,之后可用的统计数据被弃用,需要刷新(通过重新采样)

web.checkpoints.history

10

Integer

Number of checkpoints to remember for recent history.

最近历史上需要记住的检查点的数量。

web.history

5

Integer

Number of archived jobs for the JobManager.

工作管理器的存档作业数。

web.log.path

(none)

String

Path to the log file (may be in /log for standalone but under log directory when using YARN).

日志文件的路径(可以独立登录,但在使用YARN时可能处于日志目录下)

web.refresh-interval

3000

Long

Refresh interval for the web-frontend in milliseconds.

网络前端的刷新间隔以毫秒为单位。

web.submit.enable

true

Boolean

Flag indicating whether jobs can be uploaded and run from the web-frontend.

web.timeout

600000

Long

Timeout for asynchronous operations by the web monitor in milliseconds.

Web监视器的异步操作超时时间为毫秒。

web.tmpdir

System.getProperty("java.io.tmpdir")

String

Flink web directory which is used by the webmonitor.

链接web监视器使用的web目录。

web.upload.dir

(none)

String

Directory for uploading the job jars. If not specified a dynamic directory will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY.

用于上载作业罐子的目录。如果未指定,将在JOB_MANAGER_WEB_TMPDIR_KEY指定的目录下使用动态目录。

Full JobManager Options 完整的Job管理器选项

JobManager

Key

Default

Type

Description

jobmanager.archive.fs.dir

(none)

String

Dictionary for JobManager to store the archives of completed jobs.

供作业管理员存储已完成作业的档案的字典。

jobmanager.execution.attempts-history-size

16

Integer

The maximum number of prior execution attempts kept in history.

历史记录中保存的先前执行尝试的最大次数。

jobmanager.execution.failover-strategy

"region"

String

This option specifies how the job computation recovers from task failures. Accepted values are:

  • 'full': Restarts all tasks to recover the job.
  • 'region': Restarts all tasks that could be affected by the task failure. More details can be found here.

此选项指定作业计算如何从任务失败中恢复。已接受的值为:

“满”:重新启动所有任务以恢复作业。

“区域”:重新启动可能受到任务失败影响的所有任务。更多的细节可以在这里找到。

jobmanager.retrieve-taskmanager-hostname

true

Boolean

Flag indicating whether JobManager would retrieve canonical host name of TaskManager during registration. If the option is set to "false", TaskManager registration with JobManager could be faster, since no reverse DNS lookup is performed. However, local input split assignment (such as for HDFS files) may be impacted.

指示作业管理器是否会在注册过程中检索任务管理器的规范主机名的标志。如果该选项设置为“false”,那么任务管理器注册可以更快,因为不执行反向DNS查找。但是,本地输入分割分配(例如对于HDFS文件)可能会受到影响。

jobmanager.rpc.address

(none)

String

The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.

配置参数,定义要连接到以便与作业管理器通信的网络地址。此值仅在存在具有静态名称或地址的单个作业管理器的设置(简单的独立设置,或具有动态服务名称解析的容器设置)中进行解释。它在许多高可用性设置中没有使用,当使用*选举服务(如ZooKeeper)从潜在的多个备用工作经理中选择和发现工作经理*时。

jobmanager.rpc.port

6123

Integer

The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.

配置参数,定义要连接到以与作业管理器通信的网络端口。与jobmanager.rpc.address一样,这个值只在存在具有静态名称/地址和端口的单一Job管理器的设置(简单的独立设置,或具有动态服务名称解析的容器设置)中进行解释。此配置选项不用于许多高可用性设置,当使用领导者选举服务(如ZooKeeper)从潜在的多个备用作业管理器中选择和发现作业管理器领导者时。

jobstore.cache-size

52428800

Long

The job store cache size in bytes which is used to keep completed jobs in memory.

作业以字节存储的缓存大小,用于在内存中保存已完成的作业。

jobstore.expiration-time

3600

Long

The time in seconds after which a completed job expires and is purged from the job store.

完成的作业过期并从作业存储中清除的秒内时间。

jobstore.max-capacity

2147483647

Integer

The max number of completed jobs that can be kept in the job store.

可以存储在作业存储中的已完成作业的最大数量。

Blob Server

The Blob Server is a component in the JobManager. It is used for distribution of objects that are too large to be attached to a RPC message and that benefit from caching (like Jar files or large serialized code objects).

Blob服务器是作业管理器中的一个组件。它用于分发那些太大而无法连接到RPC消息和来自缓存的对象(如Jar文件或大型序列化代码对象)。

Key

Default

Type

Description

blob.client.connect.timeout

0

Integer

The connection timeout in milliseconds for the blob client.

blob客户端的连接超时,以毫秒为单位。

blob.client.socket.timeout

300000

Integer

The socket timeout in milliseconds for the blob client.

blob客户端的套接字超时,以毫秒为单位。

blob.fetch.backlog

1000

Integer

The config parameter defining the backlog of BLOB fetches on the JobManager.

定义作业管理器上BLOB获取的积压的配置参数。

blob.fetch.num-concurrent

50

Integer

The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.

定义作业管理器服务的并发BLOB获取的最大数量的配置参数。

blob.fetch.retries

5

Integer

The config parameter defining number of retires for failed BLOB fetches.

定义为失败的BLOB获取程序提供的退出次数的配置参数。

blob.offload.minsize

1048576

Integer

The minimum size for messages to be offloaded to the BlobServer.

要卸载到博客服务器的消息的最小大小。

blob.server.port

"0"

String

The config parameter defining the server port of the blob service.

定义blob服务的服务器端口的配置参数。

blob.service.cleanup.interval

3600

Long

Cleanup interval of the blob caches at the task managers (in seconds).

清除任务管理器处的阻塞性阻塞缓存的时间间隔(以秒为单位)。

blob.service.ssl.enabled

true

Boolean

Flag to override ssl support for the blob service transport.

要覆盖对blob服务传输的ssl支持的标志。

blob.storage.directory

(none)

String

The config parameter defining the storage directory to be used by the blob server.

定义blob服务器要使用的存储目录的配置参数。

ResourceManager

These configuration keys control basic Resource Manager behavior, independent of the used resource orchestration management framework (YARN, Mesos, etc.)

这些配置密钥控制基本的资源管理器行为,独立于所使用的资源编排管理框架(YARN、Mesos等)。

Key

Default

Type

Description

resourcemanager.job.timeout

"5 minutes"

String

Timeout for jobs which don't have a job manager as leader assigned.

没有分配工作经理作为领导的工作的超时。

resourcemanager.rpc.port

0

Integer

Defines the network port to connect to for communication with the resource manager. By default, the port of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.

定义要连接到以便与资源管理器通信的网络端口。默认情况下,作业管理器的端口,因为使用相同的操作系统。无法使用此配置密钥来定义端口范围。

resourcemanager.standalone.start-up-time

-1

Long

Time in milliseconds of the start-up period of a standalone cluster. During this time, resource manager of the standalone cluster expects new task executors to be registered, and will not fail slot requests that can not be satisfied by any current registered slots. After this time, it will fail pending and new coming requests immediately that can not be satisfied by registered slots. If not set, slot.request.timeout will be used by default.

独立集群启动期间的时间,以毫秒为单位。在此期间,独立集群的资源管理器希望注册新的任务执行器,并且不会失败任何当前注册的槽无法满足的槽请求。在此之后,它将失败和注册槽无法满足的新请求。如果未设置,则将默认使用slot.request.timeout。

resourcemanager.taskmanager-timeout

30000

Long

The timeout for an idle task manager to be released.

要释放的空闲任务管理器的超时时间

slotmanager.number-of-slots.max

2147483647

Integer

Defines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.

定义Flink集群分配的最大插槽数。此配置选项旨在限制批处理工作负载的资源消耗。不建议为流媒体工作负载配置此选项,如果没有足够的插槽,它可能会失败。请注意,此配置选项对独立集群无效,其中分配了多少插槽不由Flink控制。

slotmanager.redundant-taskmanager-num

0

Integer

The number of redundant task managers. Redundant task managers are extra task managers started by Flink, in order to speed up job recovery in case of failures due to task manager lost. Note that this feature is available only to the active deployments (native K8s, Yarn and Mesos).

冗余任务管理器的数量。冗余任务管理器是由Flink启动的额外任务管理器,以便在由于任务管理器丢失而失败时加快作业恢复。请注意,此功能仅可用于活动部署(本机K8、Yarn和Mesos)。

Full TaskManagerOptions

Key

Default

Type

Description

task.cancellation.interval

30000

Long

Time interval between two successive task cancellation attempts in milliseconds.

连续两次任务取消尝试之间的时间间隔,单位为毫秒。

task.cancellation.timeout

180000

Long

Timeout in milliseconds after which a task cancellation times out and leads to a fatal TaskManager error. A value of 0 deactivates the watch dog.

超时为毫秒,之后任务取消超时并导致致命的任务管理器错误。值为0将使看门狗失效。

task.cancellation.timers.timeout

7500

Long

Time we wait for the timers in milliseconds to finish all pending timer threads when the stream task is cancelled.

当流任务被取消时,我们等待计时器在毫秒内完成所有等待的计时器线程。

taskmanager.data.port

0

Integer

The task manager’s external port used for data exchange operations.

任务管理器用于数据交换操作的外部端口。

taskmanager.data.ssl.enabled

true

Boolean

Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true

启用对任务管理器数据传输的SSL支持。这仅适用于将内部SSL(security.ssl.internal.enabled)的全局标志设置为true时

taskmanager.debug.memory.log

false

Boolean

Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.

指示是否启动一个线程的标志,它会重复记录JVM的内存使用情况。

taskmanager.debug.memory.log-interval

5000

Long

The interval (in ms) for the log thread to log the current memory usage.

日志线程记录当前内存使用情况的间隔(ms)。

taskmanager.host

(none)

String

The external address of the network interface where the TaskManager is exposed. Because different TaskManagers need different values for this option, usually it is specified in an additional non-shared TaskManager-specific config file.

公开任务管理器的网络接口的外部地址。由于不同的任务管理器对此选项需要不同的值,因此通常会在附加的非共享的特定于任务管理器的配置文件中指定它。

taskmanager.jvm-exit-on-oom

false

Boolean

Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.

是否在任务线程抛出外部内存错误时,杀死任务管理器。

taskmanager.memory.segment-size

32 kb

MemorySize

Size of memory buffers used by the network stack and the memory manager.

taskmanager.network.bind-policy

"ip"

String

The automatic address binding policy used by the TaskManager if "taskmanager.host" is not set. The value should be one of the following:

  • "name" - uses hostname as binding address
  • "ip" - uses host's ip address as binding address

如果未设置“taskmanager.host”,则任务管理器将使用的自动地址绑定策略。该值应为以下各项值之一:

“name”-使用主机名作为绑定地址

“ip”-使用主机的ip地址作为绑定地址

网络堆栈和内存管理器所使用的内存缓冲区的大小。

taskmanager.numberOfTaskSlots

1

Integer

The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).

单个任务管理器可以运行的并行操作符或用户函数实例数。如果此值大于1,则单个任务管理器将接收一个函数或操作符的多个实例。这样,任务管理器就可以利用多个CPU核心,但同时,可用的内存会被分配到不同的操作员或函数实例之间。这个值通常与任务管理器的计算机所拥有的物理CPU核数成正比(例如,等于核数,或核数的一半)。

taskmanager.registration.timeout

5 min

Duration

Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.

定义任务管理器注册的超时时间。如果没有注册成功而超过了持续时间,则任务管理器将终止。

taskmanager.resource-id

(none)

String

The TaskManager's ResourceID. If not configured, the ResourceID will be generated with the "RpcAddress:RpcPort" and a 6-character random string. Notice that this option is not valid in Yarn / Mesos and Native Kubernetes mode.

任务经理的资源ID。如果未配置,资源ID将使用“RpcAdrses:RpcPort”和一个6个字符的随机字符串生成。请注意,此选项在纱线/台面和原生铜板模式下无效。

taskmanager.rpc.port

"0"

String

The external RPC port where the TaskManager is exposed. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.

公开任务管理器的外部RPC端口。接受端口列表(“50100、50101”)、范围(“50100-50200”)或两者的组合。建议设置多个端口管理器的范围,以避免在同一台机器上运行时的端口发生冲突。

Data Transport Network Stack 数据传输网络堆栈

These options are for the network stack that handles the streaming and batch data exchanges between TaskManagers.

这些选项用于处理任务管理器之间的流媒体和批处理数据交换的网络堆栈。

Key

Default

Type

Description

taskmanager.network.blocking-shuffle.compression.enabled

false

Boolean

Boolean flag indicating whether the shuffle data will be compressed for blocking shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when data compression ratio is high. Currently, shuffle data compression is an experimental feature and the config option can be changed in the future.

布尔标志,表示是否将压缩洗牌数据以阻止洗牌模式。请注意,每个缓冲区压缩数据,压缩会产生额外的CPU开销,因此当数据压缩比高时,对于IO有界的情况更有效。目前,洗牌数据压缩是一个实验特性,配置选项可以在未来进行改变。

taskmanager.network.blocking-shuffle.type

"file"

String

The blocking shuffle type, either "mmap" or "file". The "auto" means selecting the property type automatically based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once memory exceeding some threshold. Also note that this option is experimental and might be changed future.

阻塞洗牌类型,或“mmap”或“文件”。“自动”是指基于系统内存架构自动选择属性类型(mmap为64位,文件为32位)。注意,mmap的内存使用不是由配置的内存限制考虑的,但是一些资源框架,如纱线会跟踪这个内存使用情况,并在内存超过某些阈值时杀死容器。还要注意,这个选项是实验性的,将来可能会改变。

taskmanager.network.detailed-metrics

false

Boolean

Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.

启用布尔标志以启用/禁用有关入站/出站网络队列长度的更详细指标的布尔标志。

taskmanager.network.memory.buffers-per-channel

2

Integer

Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.

在基于信贷的流量控制模型中,要为每个传出/传入通道(子分区/输入通道)使用的专用网络缓冲区的数量。它应至少配置2个,以便性能良好。1个缓冲区用于接收子分区中的飞行数据,1个缓冲区用于并行序列化。

taskmanager.network.memory.floating-buffers-per-gate

8

Integer

Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.

要用于每个输出/输入门(结果分区/输入门)的额外网络缓冲区数。在基于信用的流量控制模式中,这表明在所有输入通道*享多少浮动信用。浮动缓冲区是基于积压(子分区中的实时输出缓冲区)反馈来分布的,可以帮助缓解子分区之间数据分布不平衡造成的反向压力。当节点之间的往返时间更高和/或集群中的机器数量更多时,应增加此值。

taskmanager.network.memory.max-buffers-per-channel

10

Integer

Number of max buffers that can be used for each channel. If a channel exceeds the number of max buffers, it will make the task become unavailable, cause the back pressure and block the data processing. This might speed up checkpoint alignment by preventing excessive growth of the buffered in-flight data in case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed, and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer producing large amount of data.

每个通道可使用的最大缓冲区数。如果一个通道超过了最大缓冲区的数量,它将使任务不可用,导致反向压力,并阻止数据处理。这可能会防止数据倾斜和大量配置的浮动缓冲区时的过度增长,从而加快检查点对齐。这个限制并不是严格保证的,并且可以被平面图操作符、跨越多个缓冲区的记录或产生大量数据的单个计时器等东西所忽略。

taskmanager.network.netty.client.connectTimeoutSec

120

Integer

The Netty client connection timeout.

Netty客户端连接超时。

taskmanager.network.netty.client.numThreads

-1

Integer

The number of Netty client threads.

Netty客户端线程的数量。

taskmanager.network.netty.num-arenas

-1

Integer

The number of Netty arenas.

内蒂场地的数量。

taskmanager.network.netty.sendReceiveBufferSize

0

Integer

The Netty send and receive buffer size. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.

Netty发送和接收缓冲区大小。这默认为系统缓冲区大小(cat/proc/sys/net/ipv4/tcp_[rw]mem),并且在现代Linux中为4MiB。

taskmanager.network.netty.server.backlog

0

Integer

The netty server connection backlog.

netty服务器连接积压。

taskmanager.network.netty.server.numThreads

-1

Integer

The number of Netty server threads.

Netty服务器线程的数量。

taskmanager.network.netty.transport

"auto"

String

The Netty transport type, either "nio" or "epoll". The "auto" means selecting the property mode automatically based on the platform. Note that the "epoll" mode can get better performance, less GC and have more advanced features which are only available on modern Linux.

Netty传输类型,或“nio”或“epoll”。“自动”是指根据平台自动选择属性模式。请注意,“epoll”模式可以获得更好的性能,更少的GC和更高级的功能,这只能在现代Linux上可用。

taskmanager.network.request-backoff.initial

100

Integer

Minimum backoff in milliseconds for partition requests of input channels.

对输入通道的分区请求的最小回调,单位为毫秒。

taskmanager.network.request-backoff.max

10000

Integer

Maximum backoff in milliseconds for partition requests of input channels.

对于输入通道的分区请求的最大回退时间,以毫秒为单位。

taskmanager.network.retries

0

Integer

The number of retry attempts for network communication. Currently it's only used for establishing input/output channel connections

对网络通信的重试尝试的次数。目前仅用于建立输入/输出通道连接

taskmanager.network.sort-shuffle.min-buffers

64

Integer

Minimum number of network buffers required per sort-merge blocking result partition. For large scale batch jobs, it is suggested to increase this config value to improve compression ratio and reduce small network packets. Note: to increase this config value, you may also need to increase the size of total network memory to avoid "insufficient number of network buffers" error.

每个排序合并阻塞结果分区所需的最小网络缓冲区数。对于大规模的批处理作业,建议增加该配置值,以提高压缩比,减少小的网络数据包。注意:要增加此配置值,您可能还需要增加网络总内存的大小,以避免“网络缓冲区数量不足”的错误。

taskmanager.network.sort-shuffle.min-parallelism

2147483647

Integer

Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for small parallelism, hash-based blocking shuffle will be used and for large parallelism, sort-merge blocking shuffle will be used. Note: sort-merge blocking shuffle uses unmanaged direct memory for shuffle data writing and reading so just increase the size of direct memory if direct memory OOM error occurs.

在排序合并阻塞洗牌和默认的基于哈希的屏蔽洗牌之间切换的并行阈值,这意味着对于小并行,将使用基于哈希的屏蔽洗牌,对于大型并行,将使用排序合并阻塞洗牌。注意:排序合并阻塞洗牌使用非托管的直接内存进行洗牌数据写入和读取,所以如果发生直接内存OOM错误,只需增加直接内存的大小。

RPC / Akka

Flink uses Akka for RPC between components (JobManager/TaskManager/ResourceManager). Flink does not use Akka for data transport.

Flink使用Akka在组件之间进行RPC(作业管理器/任务管理器/资源管理器)。Flink不使用Akka进行数据传输。

Key

Default

Type

Description

akka.ask.callstack

true

Boolean

If true, call stack for asynchronous asks are captured. That way, when an ask fails (for example times out), you get a proper exception, describing to the original method call and call site. Note that in case of having millions of concurrent RPC calls, this may add to the memory footprint.

如果为true,则会捕获异步请求的调用堆栈。这样,当一个请求失败时(例如超时)时,您将得到一个适当的异常,它将描述到原始的方法、调用和调用站点。请注意,如果有数百万个并发RPC调用,这可能会增加内存占用。

akka.ask.timeout

"10 s"

String

Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).

超时用于所有期货和阻止Akka呼叫。如果Flink因超时而失败,那么您应该尝试增加此值。超时可能是由于机器运行速度慢或网络拥挤造成的。超时值需要一个时间单位说明符(毫秒/分钟/小时/d)。

akka.client-socket-worker-pool.pool-size-factor

1.0

Double

The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.

池大小因子用于使用以下公式确定线程池大小:ceil(可用的处理器*因子)。然后,得到的大小以池大小最小值和池大小最大值为限制。

akka.client-socket-worker-pool.pool-size-max

2

Integer

Max number of threads to cap factor-based number to.

基于因数的上限的最大线程数。

akka.client-socket-worker-pool.pool-size-min

1

Integer

Min number of threads to cap factor-based number to.

限制基于因数的最小线程数。

akka.fork-join-executor.parallelism-factor

2.0

Double

The parallelism factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the parallelism-min and parallelism-max values.

并行性因子用于使用以下公式来确定线程池的大小:ceil(可用的处理器*因子)。然后得到的大小被并行最小值和并行最大值所限制。

akka.fork-join-executor.parallelism-max

64

Integer

Max number of threads to cap factor-based parallelism number to.

要限制基于因子的并行性数到的最大线程数。

akka.fork-join-executor.parallelism-min

8

Integer

Min number of threads to cap factor-based parallelism number to.

要限制基于因子的并行性数到的最小线程数。

akka.framesize

"10485760b"

String

Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier.

作业管理器和任务管理器之间发送的邮件的最大大小。如果Flink因为消息超过此限制而失败,则应该增加它。消息大小需要一个大小单位指定符。

akka.jvm-exit-on-fatal-error

true

Boolean

Exit JVM on fatal Akka errors.

对致命的Akka错误退出JVM。

akka.log.lifecycle.events

false

Boolean

Turns on the Akka’s remote logging of events. Set this value to 'true' in case of debugging.

打开Akka的远程事件记录。在进行调试时,请将此值设置为“true”

akka.lookup.timeout

"10 s"

String

Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d).

用于查找作业管理器的超时时间。超时值必须包含一个时间单位说明符(毫秒/分钟/小时/d)。

akka.retry-gate-closed-for

50

Long

Milliseconds a gate should be closed for after a remote connection was disconnected.

在远程连接断开后,门应关闭的毫秒。

akka.server-socket-worker-pool.pool-size-factor

1.0

Double

The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.

池大小因子用于使用以下公式确定线程池大小:ceil(可用的处理器*因子)。然后,得到的大小以池大小最小值和池大小最大值为限制。

akka.server-socket-worker-pool.pool-size-max

2

Integer

Max number of threads to cap factor-based number to.

基于因数的上限的最大线程数。

akka.server-socket-worker-pool.pool-size-min

1

Integer

Min number of threads to cap factor-based number to.

限制基于因数的最小线程数。

akka.ssl.enabled

true

Boolean

Turns on SSL for Akka’s remote communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true.

打开针对Akka的远程通信的SSL。这仅适用于将全局ssl标志security.ssl.enabled设置为true时。

akka.startup-timeout

(none)

String

Timeout after which the startup of a remote component is considered being failed.

认为远程组件启动失败的超时。

akka.tcp.timeout

"20 s"

String

Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value.

所有出站连接的超时。如果由于网络较慢而在连接到任务管理器时遇到问题,则应该增加此值。

akka.throughput

15

Integer

Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness.

在将线程返回到池之前,正在分批处理的消息数。低值表示公平的调度,而高值可以以不公平为代价来提高性能。

akka.transport.heartbeat.interval

"1000 s"

String

Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d).

为Akka的传输故障探测器的心跳间隔。由于Flink使用TCP,所以不需要使用检测器。因此,通过将间隔设置为非常高的值来禁用检测器。如果您需要传输故障检测器,请将间隔设置为一些合理的值。间隔值需要一个时间单位指定符(ms/s/min/h/d)。

akka.transport.heartbeat.pause

"6000 s"

String

Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d).

Akka的传输故障检测器的可接受的心跳暂停。由于Flink使用TCP,所以不需要使用检测器。因此,通过将暂停设置为非常高的值来禁用检测器。如果您需要传输故障检测器,请将暂停设置为一些合理的值。暂停值需要一个时间单位指定符(毫秒/分钟/h/d)。

akka.transport.threshold

300.0

Double

Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value.

传输故障检测器的阈值。由于Flink使用TCP,因此检测器没有必要,因此阈值设置为高值。

JVM and Logging Options JVM和日志记录选项

Key

Default

Type

Description

env.hadoop.conf.dir

(none)

String

Path to hadoop configuration directory. It is required to read HDFS and/or YARN configuration. You can also set it via environment variable.

到双循环配置目录的路径。需要读取HDFS和/或YARN配置。您也可以通过环境变量来设置它。

env.hbase.conf.dir

(none)

String

Path to hbase configuration directory. It is required to read HBASE configuration. You can also set it via environment variable.

到hbase配置目录的路径。它需要读取HBASE配置。您也可以通过环境变量来设置它。

env.java.opts

(none)

String

Java options to start the JVM of all Flink processes with.

使用Java选项来启动所有Flink进程的JVM。

env.java.opts.client

(none)

String

Java options to start the JVM of the Flink Client with.

使用Java选项来启动JVM的FlinkClient

env.java.opts.historyserver

(none)

String

Java options to start the JVM of the HistoryServer with.

使用来启动历史服务器的JVM的Java选项。

env.java.opts.jobmanager

(none)

String

Java options to start the JVM of the JobManager with.

使用来启动作业管理器的JVM的Java选项。

env.java.opts.taskmanager

(none)

String

Java options to start the JVM of the TaskManager with.

用来启动任务管理器的JVM的Java选项。

env.log.dir

(none)

String

Defines the directory where the Flink logs are saved. It has to be an absolute path. (Defaults to the log directory under Flink’s home)

定义保存Flink日志的目录。这必须是一条绝对的路径。(默认设置为Flink主页下的日志目录)

env.log.max

5

Integer

The maximum number of old log files to keep.

要保存的最大旧日志文件的数量。

env.ssh.opts

(none)

String

Additional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh).

当启动或停止作业管理器、任务管理器和动物管理员服务(start-cluster.sh、stop-cluster.sh、start-zookeeper-quorum.sh、stop-zookeeper-quorum.sh)时,将其他命令行选项将传递给SSH客户端。

env.yarn.conf.dir

(none)

String

Path to yarn configuration directory. It is required to run flink on YARN. You can also set it via environment variable.

到纱线配置目录的路径。它需要在院子里运行闪烁。您也可以通过环境变量来设置它。

Forwarding Environment Variables 转发环境变量

You can configure environment variables to be set on the JobManager and TaskManager processes started on Yarn/Mesos.

containerized.master.env.: Prefix for passing custom environment variables to Flink’s JobManager process. For example for passing LD_LIBRARY_PATH as an env variable to the JobManager, set containerized.master.env.LD_LIBRARY_PATH: “/usr/lib/native” in the flink-conf.yaml.

containerized.taskmanager.env.: Similar to the above, this configuration prefix allows setting custom environment variables for the workers (TaskManagers).

您可以将要设置为在纱线/台面上启动的作业管理器和任务管理器进程上设置的环境变量。

containerized.master.env.:将自定义环境变量传递给Flink的作业管理器进程的前缀。例如,要将LD_LIBRARY_PATH作为env变量传递给作业管理器,请设置containerized.master.env。LD_LIBRARY_PATH:flink-conf.yaml中的“/usr/lib/本机”。

containerized.taskmanager.env.:类似于上面的内容,此配置前缀允许为工人(任务管理器)设置自定义环境变量。

Backup

Client

Key

Default

Type

Description

client.retry-period

2 s

Duration

The interval (in ms) between consecutive retries of failed attempts to execute commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).

通过CLI或FLink的客户端执行命令失败之间的时间间隔(单位为毫秒),在支持重试的地方(默认为2秒)。

client.timeout

1 min

Duration

Timeout on the client side.

客户端的超时。

Execution

Key

Default

Type

Description

execution.attached

false

Boolean

Specifies if the pipeline is submitted in attached or detached mode.

指定管道是以附加的或分离的方式提交的。

execution.job-listeners

(none)

List<String>

Custom JobListeners to be registered with the execution environment. The registered listeners cannot have constructors with arguments.

要在执行环境中注册的自定义作业监听器。已注册的侦听器不能有带有参数的构造函数。

execution.shutdown-on-attached-exit

false

Boolean

If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.

如果作业以附加模式提交,则在CLI突然终止时执行最佳努力的集群关闭,例如,为了响应用户中断,例如键入Ctrl+C。

execution.target

(none)

String

The deployment target for the execution. This can take one of the following values:

  • remote
  • local
  • yarn-per-job
  • yarn-session
  • kubernetes-session

要执行的部署目标。这可以取以下值之一:.

Key

Default

Type

Description

execution.savepoint.ignore-unclaimed-state

false

Boolean

Allow to skip savepoint state that cannot be restored. Allow this if you removed an operator from your pipeline after the savepoint was triggered.

允许跳过无法恢复的保存点状态。如果在触发保存点后从管道中删除了一个操作员,请允许执行此操作。

execution.savepoint.path

(none)

String

Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).

从中还原作业的路径(例如hdfs:///flink/保存点-1537)。

Key

Default

Type

Description

execution.buffer-timeout

100 ms

Duration

The maximum time frequency (milliseconds) for the flushing of the output buffers. By default the output buffers flush frequently to provide low latency and to aid smooth developer experience. Setting the parameter can result in three logical modes:

  • A positive value triggers flushing periodically by that interval
  • 0 triggers flushing after every record thus minimizing latency
  • -1 ms triggers flushing only when the output buffer is full thus maximizing throughput

冲洗输出缓冲区的最大时间频率(毫秒)。默认情况下,输出缓冲区经常刷新,以提供低延迟,并帮助平稳的开发人员体验。设置参数可以产生三种逻辑模式:

正值通过该间隔周期性地触发刷新

0在每次记录后触发刷新,从而最小化延迟

-1ms仅在输出缓冲区满时触发刷新,从而最大化吞吐量

execution.checkpointing.snapshot-compression

false

Boolean

Tells if we should use compression for the state snapshot data or not

告知我们是否应该对状态快照数据使用压缩

execution.runtime-mode

STREAMING

Enum

Possible values: [STREAMING, BATCH, AUTOMATIC]

Runtime execution mode of DataStream programs. Among other things, this controls task scheduling, network shuffle behavior, and time semantics.

数据流程序的运行时执行模式。此外,它还控制着任务调度、网络洗牌行为和时间语义。

Pipeline

Key

Default

Type

Description

pipeline.auto-generate-uids

true

Boolean

When auto-generated UIDs are disabled, users are forced to manually specify UIDs on DataStream applications.

It is highly recommended that users specify UIDs before deploying to production since they are used to match state in savepoints to operators in a job. Because auto-generated ID's are likely to change when modifying a job, specifying custom IDs allow an application to evolve over time without discarding state.

当自动生成的uid被禁用时,用户*在数据流应用程序上手动指定uid。强烈建议用户在部署到生产中之前指定uid,因为它们用于将保存点中的状态与作业中的操作员进行匹配。因为自动生成的ID在修改作业时可能会发生更改,所以指定自定义ID允许应用程序随着时间的推移而演化,而不丢弃状态。

pipeline.auto-type-registration

true

Boolean

Controls whether Flink is automatically registering all types in the user programs with Kryo.

控制Flink是否使用Kryo自动注册用户程序中的所有类型。

pipeline.auto-watermark-interval

0 ms

Duration

The interval of the automatic watermark emission. Watermarks are used throughout the streaming system to keep track of the progress of time. They are used, for example, for time based windowing.

自动水印发射的时间间隔。水印在整个流媒体系统中都被使用来跟踪时间的进展。例如,它们被用于基于时间的窗口。

pipeline.cached-files

(none)

List<String>

Files to be registered at the distributed cache under the given name. The files will be accessible from any user-defined function in the (distributed) runtime under a local path. Files may be local files (which will be distributed via BlobServer), or files in a distributed file system. The runtime will copy the files temporarily to a local cache, if needed.

Example:
name:file1,path:file:///tmp/file1;name:file2,path:hdfs:///tmp/file2``

pipeline.classpaths

(none)

List<String>

A semicolon-separated list of the classpaths to package with the job jars to be sent to the cluster. These have to be valid URLs.

要打包和将发送到集群的作业罐的类路径的一个分号分隔的列表。这些必须是有效的url。

要以给定的名称在分布式缓存中注册的文件。这些文件可以从本地路径下(分布式)运行时中的任何用户定义函数访问。文件可以是本地文件(将通过BlobServer分发),或分布式文件系统中的文件。如果需要,运行时将把文件临时复制到本地缓存中。示例:名称:文件1,路径:文件:///tmp/file1;名称:文件2,路径:hdfs:///tmp/file2``

pipeline.closure-cleaner-level

RECURSIVE

Enum

Possible values: [NONE, TOP_LEVEL, RECURSIVE]

Configures the mode in which the closure cleaner works

  • NONE - disables the closure cleaner completely
  • TOP_LEVEL - cleans only the top-level class without recursing into fields
  • RECURSIVE - cleans all the fields recursively

配置关闭清洁器的工作模式

不,完全禁用关闭清洁器

TOP_LEVEL-只清理*类,而不会递归到字段中

递归-递归方式清理所有字段

pipeline.default-kryo-serializers

(none)

List<String>

Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo default serializers

Example:
class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2

类名和Kryo默认序列化类名的Kryo类名分离列表示例:类:org.example。检查类,序列化器:org.example。检查序列化器1;类:org.example。ExampleClass2,serializer:org.example.检查序列仪2

pipeline.force-avro

false

Boolean

Forces Flink to use the Apache Avro serializer for POJOs.

Important: Make sure to include the flink-avro module.

强制Flink对POJOs使用ApacheAvro序列化程序。重要事项:确保包括闪烁式模块。

pipeline.force-kryo

false

Boolean

If enabled, forces TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. In some cases this might be preferable. For example, when using interfaces with subclasses that cannot be analyzed as POJO.

如果启用,将强制打字提取器为POJOS使用Kryo序列化器,即使我们可以作为POJO进行分析。在某些情况下,这可能更可取。例如,当使用具有不能作为POJO进行分析的子类的接口时。

pipeline.generic-types

true

Boolean

If the use of generic types is disabled, Flink will throw an UnsupportedOperationException whenever it encounters a data type that would go through Kryo for serialization.
如果禁用了通用类型的使用,那么Flink在遇到将通过Kryo进行序列化的数据类型时,就会抛出一个不受支持的操作异常
Disabling generic types can be helpful to eagerly find and eliminate the use of types that would go through Kryo serialization during runtime. Rather than checking types individually, using this option will throw exceptions eagerly in the places where generic types are used.

禁用泛型类型可能有助于急切地查找并消除在运行期间将通过Kryo序列化的类型的使用。使用此选项将在使用通用类型的地方热切地抛出异常,而不是单独检查类型。

We recommend to use this option only during development and pre-production phases, not during actual production use. The application program and/or the input data may be such that new, previously unseen, types occur at some point. In that case, setting this option would cause the program to fail.

我们建议仅在开发和生产前阶段使用此选项,而不是在实际生产使用期间。应用程序和/或输入数据可能会在某个点出现新的、以前看不见的类型。在这种情况下,设置此选项将导致程序失败。

pipeline.global-job-parameters

(none)

Map

Register a custom, serializable user configuration object. The configuration can be accessed in operators

注册一个自定义的、可序列化的用户配置对象。该配置可以在操作员中访问

pipeline.jars

(none)

List<String>

A semicolon-separated list of the jars to package with the job jars to be sent to the cluster. These have to be valid paths.

要用分号分隔的jar和要发送到集群的作业jar的列表。这些路径必须是有效的路径。

pipeline.max-parallelism

-1

Integer

The program-wide maximum parallelism used for operators which haven't specified a maximum parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.

用于未指定最大并行性的操作符的程序范围内的最大并行性。最大并行性指定了动态缩放的上限和用于分区状态的关键组的数量。

pipeline.name

(none)

String

The job name used for printing and logging.

用于打印和日志记录的作业名称。

pipeline.object-reuse

false

Boolean

When enabled objects that Flink internally uses for deserialization and passing data to user-code functions will be reused. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behaviour.

当启用对象时,Flink内部用于反序列化和将数据传递给用户代码函数将被重用。请记住,当操作的用户代码函数没有意识到这种行为时,这可能会导致错误。

pipeline.operator-chaining

true

Boolean

Operator chaining allows non-shuffle operations to be co-located in the same thread fully avoiding serialization and de-serialization.

操作符链接允许非洗牌操作同时位于同一线程中,完全避免了序列化和去序列化。

pipeline.registered-kryo-types

(none)

List<String>

Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.

要在序列化堆栈中注册的类型的半节点分隔列表。如果该类型最终被序列化为POJO,则该类型将向POJO序列化程序注册。如果该类型最终用Kryo序列化,那么它将在Kryo上注册,以确保只写入标签。

pipeline.registered-pojo-types

(none)

List<String>

Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.

要在序列化堆栈中注册的类型的半节点分隔列表。如果该类型最终被序列化为POJO,则该类型将向POJO序列化程序注册。如果该类型最终用Kryo序列化,那么它将在Kryo上注册,以确保只写入标签。

Key

Default

Type

Description

pipeline.time-characteristic

ProcessingTime

Enum

Possible values: [ProcessingTime, IngestionTime, EventTime]

The time characteristic for all created streams, e.g., processingtime, event time, or ingestion time.

If you set the characteristic to IngestionTime or EventTime this will set a default watermark update interval of 200 ms. If this is not applicable for your application you should change it using pipeline.auto-watermark-interval.

所有已创建的流的时间特征,例如,处理时间、事件时间或摄入时间。如果您将该特性设置为输入时间或事件时间,这将设置一个默认的水印更新间隔为200ms。如果这不适用于您的应用程序,您应该使用pipeline.auto-watermark-interval更改它。

Checkpointing

Key

Default

Type

Description

execution.checkpointing.externalized-checkpoint-retention

(none)

Enum

Possible values: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]

Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status JobStatus#FAILED or JobStatus#SUSPENDED. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.

The mode defines how an externalized checkpoint should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED).

The target directory for externalized checkpoints is configured via state.checkpoints.dir.

外部检查点将其元数据写入持久存储,当拥有作业失败或挂起(终止时作业状态JobStatus#失败或JobStatus#暂停时不会自动清理。在这种情况下,您必须手动清理检查点状态,包括元数据和实际程序状态。该模式定义了在取消作业时应该如何清理外部化的检查点。如果选择在取消时保留外部化检查点,则必须在取消作业时手动清理检查点(以取消作业状态JobStatus#终止)。外部化检查点的目标目录是通过state.checkpoints.dir配置的。

execution.checkpointing.interval

(none)

Duration

Gets the interval in which checkpoints are periodically scheduled.

This setting defines the base interval. Checkpoint triggering may be delayed by the settings execution.checkpointing.max-concurrent-checkpoints and execution.checkpointing.min-pause

获取定期调度检查点的时间间隔。此设置定义了基值间隔。检查点的触发可能会被设置execution.checkpointing.max-concurrent-checkpoints和execution.checkpointing.min-pause所延迟

execution.checkpointing.max-concurrent-checkpoints

1

Integer

The maximum number of checkpoint attempts that may be in progress at the same time. If this value is n, then no checkpoints will be triggered while n checkpoint attempts are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or expire.

可能同时进行的最大检查点尝试次数。如果此值为n,则当前运行n个检查点尝试时不会触发任何检查点。要触发下一个检查点,一个检查点尝试需要完成或过期。

execution.checkpointing.min-pause

0 ms

Duration

The minimal pause between checkpointing attempts. This setting defines how soon thecheckpoint coordinator may trigger another checkpoint after it becomes possible to triggeranother checkpoint with respect to the maximum number of concurrent checkpoints(see execution.checkpointing.max-concurrent-checkpoints).

If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure that a minimum amount of time passes where no checkpoint is in progress at all.

检查点指向尝试之间的最小暂停。此设置定义了在有可能触发另一个检查点后,检查点协调器触发另一个检查点的时间(参见execution.checkpointing.max-concurrent-checkpoints)。如果并发检查点的最大数量设置为1,此设置将有效地确保根本没有检查点的最小时间通过。

execution.checkpointing.mode

EXACTLY_ONCE

Enum

Possible values: [EXACTLY_ONCE, AT_LEAST_ONCE]

The checkpointing mode (exactly-once vs. at-least-once).

检查点模式(完全是一次与至少一次)。

execution.checkpointing.prefer-checkpoint-for-recovery

false

Boolean

If enabled, a job recovery should fallback to checkpoint when there is a more recent savepoint.

如果启用,当有最近的保存点时,作业恢复应该回落到检查点。

execution.checkpointing.timeout

10 min

Duration

The maximum time that a checkpoint may take before being discarded.

检查点在被丢弃之前可能花费的最大时间。

execution.checkpointing.tolerable-failed-checkpoints

(none)

Integer

The tolerable checkpoint failure number. If set to 0, that meanswe do not tolerance any checkpoint failure.

可容忍的检查点故障号。如果设置为0,则这意味着我们不容忍任何检查点失败。

execution.checkpointing.unaligned

false

Boolean

Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.

Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.

Unaligned checkpoints can only be enabled if execution.checkpointing.mode is EXACTLY_ONCE and if execution.checkpointing.max-concurrent-checkpoints is 1

启用不对齐的检查点,这大大减少了在背压下的检查点时间。未对齐的检查点包含作为检查区状态的一部分存储在缓冲区中的数据,这允许检查点屏障超过这些缓冲区。因此,由于检查点障碍不再有效地嵌入到数据流中,检查点持续时间独立于当前吞吐量。只有在execution.checkpointing.mode为EXACTLY_ONCE,且execution.checkpointing.max-concurrent-checkpoints为1时,才能启用未对齐的检查点

上一篇:oracle之PGA相关的sql


下一篇:并发编程:乱序执行的那些事儿五分钟给你整明白