flinkX快速启动


在flinkx启动之前,请提前安装git,maven,mysql(已同步mysql为例)。 windows安装提前安装好jdk和maven

下载代码

1.使用git工具把项目clone到本地,或者直接下载flinkx-1.8.5.zip

git clone https://github.com/DTStack/flinkx.git cd flinkx

2.直接下载源码
wget https://github.com/DTStack/flinkx/archive/1.8.5.zip unzip flinkx-1.8.5.zip cd flink-1.8.5
编译插件
mvn clean package -DskipTests
编译会遇到文件权限不够的问题,提前给flinkx文件夹授权
编译时间需要几十分钟,耐心等待
编译完成后,会多出一个plugins目录(flinkx1.10版本后为syncplugins),这个目录存放这FlinkX的插件包
** 编译好的文件夹拷贝到其他服务器一样可以使用

准备任务

启动以mysql同步到mysql做实例。先创建两张表,往其中一个表内添加些数据

create table emp(
    id   int(10)     null,
    name varchar(10) null
);
insert into emp values(1,'aaa'),(2,'bbb');

create table emp2(
    id   int(10)     null,
    name varchar(10) null
);

首先准备要运行的任务json配置文件;
在flinkx文件目录下,新建job文件夹,在编写mysql2mysql.json文件
修改json配置文件内的数据库连接信息,有源表信息和目标表信息

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [{
              "jdbcUrl": ["jdbc:mysql://192.168.8.135:3306/testbase?useUnicode=true&characterEncoding=utf8"],
              "table": ["emp"]
            }],
            "column": ["id","name"],
            "customSql": "",
            "splitPk": "",
            "queryTimeOut": 1000,
            "requestAccumulatorInterval": 2
          },
          "name": "mysqlreader"
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://192.168.8.135:3306/testbase?useSSL=false",
                "table": ["emp2"]
              }
            ],
            "writeMode": "insert",
            "column": ["id","name"],
            "batchSize": 1024
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      },
      "restore": {
        "maxRowNumForCheckpoint": 0,
        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      },
      "log" : {
        "isLogger": false,
        "level" : "debug",
        "path" : "",
        "pattern":""
      }
    }
  }
}

Local模式运行任务
在flinkx目录下,新建start.sh文件,添加以下内容
以本地模式启动程序

/usr/local/src/flinkx-1.8.5/bin/flinkx \ 
-mode local \ 
-job /usr/local/src/flinkx-1.8.5/job/mysql2mysql.json \ 
-pluginRoot /usr/local/src/flinkx-1.8.5/plugins \ 
-flinkconf /usr/local/src/flinkx-1.8.5/flinkconf \ 
-confProp "{\"flink.checkpoint.interval\":60000}"

启动
启动前给start.sh文件授权。启动后的日志就在flinkx/nohup.out内

chmod 777 start.sh
sh start.sh

成功运行后的日志是这样的
flinkX快速启动
数据表的变化
flinkX快速启动

上一篇:一文带你学会如何基于Flink构建实时计算平台


下一篇:一文详解数栈FlinkX实时采集原理与使用