在flinkx启动之前,请提前安装git,maven,mysql(已同步mysql为例)。 windows安装提前安装好jdk和maven
下载代码
1.使用git工具把项目clone到本地,或者直接下载flinkx-1.8.5.zip
git clone https://github.com/DTStack/flinkx.git cd flinkx
2.直接下载源码
wget https://github.com/DTStack/flinkx/archive/1.8.5.zip unzip flinkx-1.8.5.zip cd flink-1.8.5
编译插件
mvn clean package -DskipTests
编译会遇到文件权限不够的问题,提前给flinkx文件夹授权
编译时间需要几十分钟,耐心等待
编译完成后,会多出一个plugins目录(flinkx1.10版本后为syncplugins),这个目录存放这FlinkX的插件包
** 编译好的文件夹拷贝到其他服务器一样可以使用
准备任务
启动以mysql同步到mysql做实例。先创建两张表,往其中一个表内添加些数据
create table emp(
id int(10) null,
name varchar(10) null
);
insert into emp values(1,'aaa'),(2,'bbb');
create table emp2(
id int(10) null,
name varchar(10) null
);
首先准备要运行的任务json配置文件;
在flinkx文件目录下,新建job文件夹,在编写mysql2mysql.json文件
修改json配置文件内的数据库连接信息,有源表信息和目标表信息
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [{
"jdbcUrl": ["jdbc:mysql://192.168.8.135:3306/testbase?useUnicode=true&characterEncoding=utf8"],
"table": ["emp"]
}],
"column": ["id","name"],
"customSql": "",
"splitPk": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.8.135:3306/testbase?useSSL=false",
"table": ["emp2"]
}
],
"writeMode": "insert",
"column": ["id","name"],
"batchSize": 1024
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
},
"errorLimit": {
"record": 100
},
"restore": {
"maxRowNumForCheckpoint": 0,
"isRestore": false,
"restoreColumnName": "",
"restoreColumnIndex": 0
},
"log" : {
"isLogger": false,
"level" : "debug",
"path" : "",
"pattern":""
}
}
}
}
Local模式运行任务
在flinkx目录下,新建start.sh文件,添加以下内容
以本地模式启动程序
/usr/local/src/flinkx-1.8.5/bin/flinkx \
-mode local \
-job /usr/local/src/flinkx-1.8.5/job/mysql2mysql.json \
-pluginRoot /usr/local/src/flinkx-1.8.5/plugins \
-flinkconf /usr/local/src/flinkx-1.8.5/flinkconf \
-confProp "{\"flink.checkpoint.interval\":60000}"
启动
启动前给start.sh文件授权。启动后的日志就在flinkx/nohup.out内
chmod 777 start.sh
sh start.sh
成功运行后的日志是这样的
数据表的变化