在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行;
flink on yarn的前提是:hdfs、yarn均启动
修改hadoop的配置参数
vim etc/hadoop/yarn-site.xml
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job
修改全局变量/etc/profile
vim /etc/profile
export HADOOP_CONF_DIR=/opt/cdh/hadoop/etc/Hadoop
YARN_CONF_DIR或者HADOOP_CONF_DIR必须将环境变量设置为读取YARN和HDFS配置
使用flink on yarn提交任务
在YARN上启动一个Flink主要有两种方式:
(1)、启动一个YARN session(Start a long-running Flink cluster on YARN);
(2)、直接在YARN上提交运行Flink作业(Run a Flink job on YARN)
这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和TaskManagers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)
通过./bin/yarn-session.sh脚本启动YARN Session
脚本可以携带的参数:
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB]
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-st,--streaming Start Flink in streaming mode
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB]
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
注意:
如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d或--detached。
在这种情况下,Flink YARN客户端只会将Flink提交给群集,然后关闭它自己
启动:
bin/yarn-session.sh -n 2 -tm 800 -s 2
上面的命令的意思是,同时向Yarn申请3个container(即便只申请了两个,因为ApplicationMaster和Job Manager有一个额外的容器。一旦将Flink部署到YARN群集中,它就会显示Job Manager的连接详细信息。),其中 2 个 Container 启动 TaskManager(-n 2),每个 TaskManager 拥有两个 Task Slot(-s 2),并且向每个 TaskManager 的 Container 申请 800M 的内存,以及一个ApplicationMaster(Job Manager)。
然后使用flink提交自带的任务:
bin/flink run examples/batch/WordCount.jar
停止当前任务:
1:CTRL+C
2:stop命令
3:yarn application -kill application_1527077715040_0007
分离的YARN会话
如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d或--detached。
在这种情况下,Flink YARN客户端只会将Flink提交给群集,然后关闭它自己。请注意,在这种情况下,无法使用Flink停止YARN会话。
使用YARN实用程序(yarn application -kill <appId>)停止YARN会话
通过分离yarn会话来执行:
bin/yarn-session.sh -n 2 -tm 800 -s 2 -d bin/yarn-session.sh -n 2 -s 6 -jm 1024 -tm 700 -nm test
关闭:
yarn application -kill application_1527077715040_0007
第二种方式:在YARN上运行一个Flink作业
上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业,这里我们还是使用./bin/flink,但是不需要事先启动YARN session:
bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
以上命令在参数前加上y前缀,-yn表示TaskManager个数
在8088页面观察
停止yarn-cluster
yarn application -kill application的ID
注意:
在创建集群的时候,集群的配置参数就写好了,但是往往因为业务需要,要更改一些配置参数,这个时候可以不必因为一个实例的提交而修改conf/flink-conf.yaml; 可以通过:-D <arg> Dynamic properties 来覆盖原有的配置信息:比如:
-Dfs.overwrite-files=true
-Dtaskmanager.network.numberOfBuffers=16368