Standalone Cluster 就是独立的 Flink 集群,相对应的有基于 YARN 的 Flink 集群
要求
- Java 1.8 和 JAVA_HOME 环境变量
- 不同机器之间支持 SSH 免密码登陆
- 不同机器都有相同的 Flink 目录结构
下载
Flink 包 (https://flink.apache.org/downloads.html)
wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.12.tgz
tar xzf flink-1.10.0-bin-scala_2.12.tgz
cd flink-1.10.0
如果需要和 Hadoop 集成
在 Flink 1.8 之前,需要下载带有 Hadoop 的 Flink 包
从 Flink 1.8 开始,Hadoop 的包被单独分离出来
cd flink-1.10.0/lib
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-9.0/flink-shaded-hadoop-2-uber-2.7.5-9.0.jar
(官网那个 download 页面也写的不很清楚)
配置
conf/flink-conf.yaml
lin@Ubuntu-VM-1:$ cat conf/flink-conf.yaml | grep -v "^#" | grep -v "^$"
jobmanager.rpc.address: 192.168.1.1 # jobmanager ip
jobmanager.rpc.port: 6123 # jobmanager port,用于提交 job
jobmanager.heap.size: 1024m # jobmanager 内存
taskmanager.memory.process.size: 1568m # task manager 内存
taskmanager.numberOfTaskSlots: 1 # 每个 task manager 有多少个 slot
parallelism.default: 1 # flink run 命令不指定并行度时,默认使用 1
jobmanager.execution.failover-strategy: region # 故障重启策略
# region 代表只重启 ExecutionGraph 中对应的 Region 的 Task
# full 代表重启 Job 中所有的 Task,即重置整个 ExecutionGraph
这是默认的配置项,还有很多其他选项
conf/masters 配置 Job Manager 的 IP 和 Web UI 的端口
192.168.1.1:8081
conf/slaves 配置 Task Manager 的 IP
192.168.1.2
192.168.1.3
把配置 copy 到所有节点
启动
bin/start-cluster.sh
这个脚本到 conf/masters 配置的节点上调用下面的命令启动 Job Manager
bin/jobmanager.sh
再到 conf/slaves 配置的节点上调用下面的命令启动 Task Manager
bin/taskmanager.sh
在 master 节点上可以看到启动了下面的 Java 程序
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
在 slave 节点上可以看到启动了下面的 Java 程序
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
可以登录 master 的 localhost:8081 通过 Web UI 查看
可以看到有 3 个 Task Manager,由于每个 Task Manager 只配了一个 slot,总共是 3 个 Task Slot
提交 Job
提交 examples 目录下的例子程序
bin/flink run examples/batch/WordCount.jar --input ./README.txt --output ./wordcount.txt
会一直等待直到程序完成,输出日志如下
Job has been submitted with JobID f038a66d9b9d6c9e9b80b866dde2dacf
Program execution finished
Job with JobID f038a66d9b9d6c9e9b80b866dde2dacf has finished.
Job Runtime: 4508 ms
可以通过 -d (detach) 参数提交后就退出,不用等待程序结束
bin/flink run -d examples/batch/WordCount.jar --input ./README.txt --output ./wordcount.txt
输出日志如下
Job has been submitted with JobID 161410edb6b9a28ca69e84e5fe0885c3
可以到 Web UI 的 Completed Jobs 下查看
bin/flink 除了 run 还有其他命令,简单介绍如下
Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
Action "info" shows the optimized execution plan of the program (JSON).
Syntax: info [OPTIONS] <jar-file> <arguments>
Action "list" lists running and scheduled programs.
Syntax: list [OPTIONS]
Action "stop" stops a running program with a savepoint (streaming jobs only).
Syntax: stop [OPTIONS] <Job ID>
Action "cancel" cancels a running program.
Syntax: cancel [OPTIONS] <Job ID>
Action "savepoint" triggers savepoints for a running job or disposes existing ones.
Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
bin/flink run 的部分参数简单介绍
"run" action options:
-c,--class <classname> Class with the program entry point
("main()" method). Only needed if the
JAR file does not specify the class in
its manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in detached mode
-n,--allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the triggered.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the configuration.
-py,--python <pythonFile> Python script with the program entry
point. The dependent resources can be
configured with the `--pyFiles` option.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job from
-sae,--shutdownOnAttachedExit If the job is submitted in attached
mode, perform a best-effort cluster
shutdown when the CLI is terminated
abruptly, e.g., in response to a user
interrupt, such as typing Ctrl + C.
Options for executor mode:
-D <property=value> Generic configuration options for
execution/deployment and for the configured executor.
The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
Options for default mode:
-m,--jobmanager <arg> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
run 命令支持提交 python 程序,这里没列出来
添加 JobManager/TaskManager 到集群
在要添加的 master 节点执行
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
在要添加的 slave 节点执行
bin/taskmanager.sh start|start-foreground|stop|stop-all
不需要停止服务
停止集群
stop-cluster.sh