flinkx使用指南

下载代码

1.使用git工具把项目clone到本地

git clone https://github.com/DTStack/flinkx.git
cd flinkx

2.直接下载源码

wget https://github.com/DTStack/flinkx/archive/1.8.5.zip
unzip flinkx-1.8.5.zip
cd flink-1.8.5

编译插件

mvn clean package -DskipTests

运行任务

首先准备要运行的任务json,这里以stream插件为例:

{
  "job" : {
    "content" : [ {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "name": "id",
              "type": "int"
            },
            {
              "name": "name",
              "type": "string"
            }
          ]
        }
      },
      "writer" : {
        "parameter" : {
          "print": false
        },
        "name" : "streamwriter"
      }
    } ],
    "setting" : {
      "restore" : {
        "isRestore" : false,
        "isStream" : false
      },
      "errorLimit" : {
      },
      "speed" : {
        "bytes" : 0,
        "channel" : 1,
        "rebalance" : true
      }
    }
  }
}

Local模式运行任务

命令模板:

bin/flinkx -mode local \
                     -job $FLINK_HOME/examples/stream_example.json \
           -pluginRoot $FLINK_HOME/plugins \
           -confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/flink_checkpoint/\"}"

可以在flink的配置文件里配置端口:

## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888

使用下面的命令运行任务:

./bin/flinkx -job ./job/stream.json 
             -flinkconf $FLINK_HOME/conf

任务运行后可以通过8888端口访问flink界面查看任务运行情况:

flinkx使用指南

Standalone模式运行

命令模板:

bin/flinkx -mode standalone \
           -job $FLINK_HOME/examples/stream_example.json \
           -pluginRoot $FLINK_HOME/plugins \
           -flinkconf $FLINK_HOME/conf \
           -confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/flink_checkpoint/\"}"

首先启动flink集群:

# flink集群默认端口是8081
$FLINK_HOME/bin/start-cluster.sh

通过8081端口检查集群是否启动成功

flinkx使用指南

把任务提交到集群上运行:

./bin/flinkx -mode standalone \
             -job ./job/stream.json \
             -flinkconf $FLINK_HOME/conf

在集群上查看任务运行情况

flinkx使用指南

以Yarn Session模式运行任务

命令示例:

bin/flinkx -mode yarn \
           -job $FLINK_HOME/examples/stream_example.json \
           -pluginRoot $FLINK_HOME/plugins \
           -flinkconf $FLINK_HOME/conf \
           -yarnconf $HADOOP_HOME/etc/hadoop \
           -confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/flink_checkpoint/\"}"

首先确保yarn集群是可用的,然后手动启动一个yarn session:

$FLINK_HOME/bin/yarn-session.sh -n 1 -s 2 -jm 1024 -tm 1024
flinkx使用指南flinkx使用指南

把任务提交到这个yarn session上:

./bin/flinkx -mode yarn \
             -job ./job/stream.json \
             -flinkconf $FLINK_HOME/conf \
             -yarnconf $HADOOP_HOME/etc/hadoop

然后在flink界面查看任务运行情况:

flinkx使用指南

以Yarn Perjob模式运行任务

命令示例:

bin/flinkx -mode yarnPer \
           -job ${FLINK_HOME}/examples/stream_example.json \
           -pluginRoot $FLINK_HOME/plugins \
           -flinkconf $FLINK_HOME/conf \
           -yarnconf $HADOOP_HOME/etc/hadoop \
           -flinkLibJar $FLINK_HOME/lib \
           -confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/flink_checkpoint/\"}" \ 
           -queue default \
           -pluginLoadMode classpath

首先确保yarn集群是可用的,启动一个Yarn Application运行任务:

bin/flinkx -mode yarnPer \
           -job ./job/stream.json \
           -flinkconf $FLINK_HOME/conf \
           -yarnconf $HADOOP_HOME/etc/hadoop \
           -flinkLibJar $FLINK_HOME/lib \
           -pluginLoadMode classpath

然后在集群上查看任务运行情况

flinkx使用指南flinkx使用指南

参数说明

名称 说明 可选值 是否必填 默认值
model 执行模式,也就是flink集群的工作模式 1.local: 本地模式
2.standalone: 独立部署模式的flink集群
3.yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
4.yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster"
local
job 数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息
jobid 任务名称 Flink Job
pluginRoot 插件根目录地址,也就是打包后产生的pluginRoot目录。 $FLINKX_HOME/plugins
flinkconf flink配置文件所在的目录(单机模式下不需要) $FLINK_HOME/conf $FLINK_HOME/conf
flinkLibJar flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/lib $FLINK_HOME/lib $FLINK_HOME/lib
yarnconf Hadoop配置文件(包括hdfs和yarn)所在的目录 $HADOOP_HOME/etc/hadoop $HADOOP_HOME/etc/hadoop
queue yarn队列,如default default
pluginLoadMode yarn session模式插件加载方式 1.classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快
2.shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境
shipfile
confProp checkpoint配置 flink.checkpoint.interval:快照生产频率
flink.checkpoint.stateBackend:快照存储路径
s checkpoint快照路径

常见问题

1.编译找不到DB2、达梦、gbase、ojdbc8等驱动包

解决办法:在$FLINKX_HOME/jars目录下有这些驱动包,可以手动安装,也可以使用插件提供的脚本安装:

## windows平台
./install_jars.bat

## unix平台
./install_jars.sh
上一篇:Flink基础教程:FlinkX RDB介绍与基本演示


下一篇:袋鼠云研发手记 | 数栈·开源:Github上400+Star的硬核分布式同步工具FlinkX