下载代码
1.使用git工具把项目clone到本地
git clone https://github.com/DTStack/flinkx.git
cd flinkx
2.直接下载源码
wget https://github.com/DTStack/flinkx/archive/1.8.5.zip
unzip flinkx-1.8.5.zip
cd flink-1.8.5
编译插件
mvn clean package -DskipTests
运行任务
首先准备要运行的任务json,这里以stream插件为例:
{
"job" : {
"content" : [ {
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
}
},
"writer" : {
"parameter" : {
"print": false
},
"name" : "streamwriter"
}
} ],
"setting" : {
"restore" : {
"isRestore" : false,
"isStream" : false
},
"errorLimit" : {
},
"speed" : {
"bytes" : 0,
"channel" : 1,
"rebalance" : true
}
}
}
}
Local模式运行任务
命令模板:
bin/flinkx -mode local \
-job $FLINK_HOME/examples/stream_example.json \
-pluginRoot $FLINK_HOME/plugins \
-confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/flink_checkpoint/\"}"
可以在flink的配置文件里配置端口:
## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888
使用下面的命令运行任务:
./bin/flinkx -job ./job/stream.json
-flinkconf $FLINK_HOME/conf
任务运行后可以通过8888端口访问flink界面查看任务运行情况:
Standalone模式运行
命令模板:
bin/flinkx -mode standalone \
-job $FLINK_HOME/examples/stream_example.json \
-pluginRoot $FLINK_HOME/plugins \
-flinkconf $FLINK_HOME/conf \
-confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/flink_checkpoint/\"}"
首先启动flink集群:
# flink集群默认端口是8081
$FLINK_HOME/bin/start-cluster.sh
通过8081端口检查集群是否启动成功
把任务提交到集群上运行:
./bin/flinkx -mode standalone \
-job ./job/stream.json \
-flinkconf $FLINK_HOME/conf
在集群上查看任务运行情况
以Yarn Session模式运行任务
命令示例:
bin/flinkx -mode yarn \
-job $FLINK_HOME/examples/stream_example.json \
-pluginRoot $FLINK_HOME/plugins \
-flinkconf $FLINK_HOME/conf \
-yarnconf $HADOOP_HOME/etc/hadoop \
-confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/flink_checkpoint/\"}"
首先确保yarn集群是可用的,然后手动启动一个yarn session:
$FLINK_HOME/bin/yarn-session.sh -n 1 -s 2 -jm 1024 -tm 1024
把任务提交到这个yarn session上:
./bin/flinkx -mode yarn \
-job ./job/stream.json \
-flinkconf $FLINK_HOME/conf \
-yarnconf $HADOOP_HOME/etc/hadoop
然后在flink界面查看任务运行情况:
以Yarn Perjob模式运行任务
命令示例:
bin/flinkx -mode yarnPer \
-job ${FLINK_HOME}/examples/stream_example.json \
-pluginRoot $FLINK_HOME/plugins \
-flinkconf $FLINK_HOME/conf \
-yarnconf $HADOOP_HOME/etc/hadoop \
-flinkLibJar $FLINK_HOME/lib \
-confProp "{\"flink.checkpoint.interval\":60000,\"flink.checkpoint.stateBackend\":\"/flink_checkpoint/\"}" \
-queue default \
-pluginLoadMode classpath
首先确保yarn集群是可用的,启动一个Yarn Application运行任务:
bin/flinkx -mode yarnPer \
-job ./job/stream.json \
-flinkconf $FLINK_HOME/conf \
-yarnconf $HADOOP_HOME/etc/hadoop \
-flinkLibJar $FLINK_HOME/lib \
-pluginLoadMode classpath
然后在集群上查看任务运行情况
参数说明
名称 | 说明 | 可选值 | 是否必填 | 默认值 |
---|---|---|---|---|
model | 执行模式,也就是flink集群的工作模式 | 1.local: 本地模式 2.standalone: 独立部署模式的flink集群 3.yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster" 4.yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster" |
否 | local |
job | 数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息 | 无 | 是 | 无 |
jobid | 任务名称 | 无 | 否 | Flink Job |
pluginRoot | 插件根目录地址,也就是打包后产生的pluginRoot目录。 | 无 | 否 | $FLINKX_HOME/plugins |
flinkconf | flink配置文件所在的目录(单机模式下不需要) | $FLINK_HOME/conf | 否 | $FLINK_HOME/conf |
flinkLibJar | flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.8.1/lib | $FLINK_HOME/lib | 否 | $FLINK_HOME/lib |
yarnconf | Hadoop配置文件(包括hdfs和yarn)所在的目录 | $HADOOP_HOME/etc/hadoop | 否 | $HADOOP_HOME/etc/hadoop |
queue | yarn队列,如default | 无 | 否 | default |
pluginLoadMode | yarn session模式插件加载方式 | 1.classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快 2.shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境 |
否 | shipfile |
confProp | checkpoint配置 |
flink.checkpoint.interval:快照生产频率 flink.checkpoint.stateBackend:快照存储路径 |
否 | 无 |
s | checkpoint快照路径 | 否 | 无 |
常见问题
1.编译找不到DB2、达梦、gbase、ojdbc8等驱动包
解决办法:在$FLINKX_HOME/jars目录下有这些驱动包,可以手动安装,也可以使用插件提供的脚本安装:
## windows平台
./install_jars.bat
## unix平台
./install_jars.sh