Flink : Standalone Cluster

Standalone Cluster 就是独立的 Flink 集群,相对应的有基于 YARN 的 Flink 集群

要求

  1. Java 1.8 和 JAVA_HOME 环境变量
  2. 不同机器之间支持 SSH 免密码登陆
  3. 不同机器都有相同的 Flink 目录结构

下载

Flink 包 (https://flink.apache.org/downloads.html

wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.12.tgz

tar xzf flink-1.10.0-bin-scala_2.12.tgz

cd flink-1.10.0

如果需要和 Hadoop 集成
在 Flink 1.8 之前,需要下载带有 Hadoop 的 Flink 包
从 Flink 1.8 开始,Hadoop 的包被单独分离出来

cd flink-1.10.0/lib

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-9.0/flink-shaded-hadoop-2-uber-2.7.5-9.0.jar

(官网那个 download 页面也写的不很清楚)

配置

conf/flink-conf.yaml

lin@Ubuntu-VM-1:$ cat conf/flink-conf.yaml | grep -v "^#" | grep -v "^$"

jobmanager.rpc.address: 192.168.1.1                # jobmanager ip
jobmanager.rpc.port: 6123                          # jobmanager port,用于提交 job 
jobmanager.heap.size: 1024m                        # jobmanager 内存

taskmanager.memory.process.size: 1568m             # task manager 内存
taskmanager.numberOfTaskSlots: 1                   # 每个 task manager 有多少个 slot

parallelism.default: 1                             # flink run 命令不指定并行度时,默认使用 1

jobmanager.execution.failover-strategy: region     # 故障重启策略
                                                   # region 代表只重启 ExecutionGraph 中对应的 Region 的 Task
                                                   # full 代表重启 Job 中所有的 Task,即重置整个 ExecutionGraph

这是默认的配置项,还有很多其他选项

conf/masters 配置 Job Manager 的 IP 和 Web UI 的端口

192.168.1.1:8081

conf/slaves 配置 Task Manager 的 IP

192.168.1.2
192.168.1.3

把配置 copy 到所有节点

启动

bin/start-cluster.sh

这个脚本到 conf/masters 配置的节点上调用下面的命令启动 Job Manager

bin/jobmanager.sh

再到 conf/slaves 配置的节点上调用下面的命令启动 Task Manager

bin/taskmanager.sh

在 master 节点上可以看到启动了下面的 Java 程序

org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

在 slave 节点上可以看到启动了下面的 Java 程序

org.apache.flink.runtime.taskexecutor.TaskManagerRunner

可以登录 master 的 localhost:8081 通过 Web UI 查看
Flink : Standalone Cluster
可以看到有 3 个 Task Manager,由于每个 Task Manager 只配了一个 slot,总共是 3 个 Task Slot

提交 Job

提交 examples 目录下的例子程序

bin/flink run examples/batch/WordCount.jar --input ./README.txt --output ./wordcount.txt

会一直等待直到程序完成,输出日志如下

Job has been submitted with JobID f038a66d9b9d6c9e9b80b866dde2dacf
Program execution finished
Job with JobID f038a66d9b9d6c9e9b80b866dde2dacf has finished.
Job Runtime: 4508 ms

可以通过 -d (detach) 参数提交后就退出,不用等待程序结束

bin/flink run -d examples/batch/WordCount.jar --input ./README.txt --output ./wordcount.txt

输出日志如下

Job has been submitted with JobID 161410edb6b9a28ca69e84e5fe0885c3

可以到 Web UI 的 Completed Jobs 下查看
Flink : Standalone Cluster
bin/flink 除了 run 还有其他命令,简单介绍如下

Action "run" compiles and runs a program.
  Syntax: run [OPTIONS] <jar-file> <arguments>

Action "info" shows the optimized execution plan of the program (JSON).
  Syntax: info [OPTIONS] <jar-file> <arguments>

Action "list" lists running and scheduled programs.
  Syntax: list [OPTIONS]

Action "stop" stops a running program with a savepoint (streaming jobs only).
  Syntax: stop [OPTIONS] <Job ID>

Action "cancel" cancels a running program.
  Syntax: cancel [OPTIONS] <Job ID>

Action "savepoint" triggers savepoints for a running job or disposes existing ones.
  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]

bin/flink run 的部分参数简单介绍

  "run" action options:
     -c,--class <classname>               Class with the program entry point
                                          ("main()" method). Only needed if the
                                          JAR file does not specify the class in
                                          its manifest.

     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share). You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.

     -d,--detached                        If present, runs the job in detached mode

     -n,--allowNonRestoredState           Allow to skip savepoint state that
                                          cannot be restored. You need to allow
                                          this if you removed an operator from
                                          your program that was part of the triggered.

     -p,--parallelism <parallelism>       The parallelism with which to run the
                                          program. Optional flag to override the
                                          default value specified in the configuration.

     -py,--python <pythonFile>            Python script with the program entry
                                          point. The dependent resources can be
                                          configured with the `--pyFiles` option.

     -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job from

     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
                                          mode, perform a best-effort cluster
                                          shutdown when the CLI is terminated
                                          abruptly, e.g., in response to a user
                                          interrupt, such as typing Ctrl + C.

  Options for executor mode:
     -D <property=value>   Generic configuration options for
                           execution/deployment and for the configured executor.
                           The available options can be found at
                           https://ci.apache.org/projects/flink/flink-docs-stabl
                           e/ops/config.html

     -e,--executor <arg>   The name of the executor to be used for executing the
                           given job, which is equivalent to the
                           "execution.target" config option. The currently
                           available executors are: "remote", "local",
                           "kubernetes-session", "yarn-per-job", "yarn-session".

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                     to connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.

     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode

run 命令支持提交 python 程序,这里没列出来

添加 JobManager/TaskManager 到集群

在要添加的 master 节点执行

bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

在要添加的 slave 节点执行

bin/taskmanager.sh start|start-foreground|stop|stop-all

不需要停止服务

停止集群

stop-cluster.sh


上一篇:Latex standalone tips


下一篇:第 7 节 Flink standalone集群HA配置