注意:
目前是启动程序全量的执行join
1、pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>FlinkSql</artifactId> <version>1.0-SNAPSHOT</version> <!--全局版本控制--> <properties> <java.version>1.8</java.version> <flink.version>1.11.3</flink.version> <scala.binary.version>2.11</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- <dependency>--> <!-- <groupId>mysql</groupId>--> <!-- <artifactId>mysql-connector-java</artifactId>--> <!-- <version>8.0.16</version>--> <!-- </dependency>--> <dependency> <groupId>com.alibaba.ververica</groupId> <!-- add the dependency matching your database --> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>${flink.version}</version> <scope> compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.11.1</version> </dependency> <!-- old planner flink table--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <!--new planner--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-format-changelog-json</artifactId> <version>1.0.0</version> </dependency> <!-- 日志相关依赖,flink必须要加,否则报错 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <configuration> <scalaCompatVersion>2.11</scalaCompatVersion> <scalaVersion>2.11.12</scalaVersion> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar包插件(会包含所有依赖) --> <!-- <plugin>--> <!-- <groupId>org.apache.maven.plugins</groupId>--> <!-- <artifactId>maven-shade-plugin</artifactId>--> <!-- <version>2.3</version>--> <!-- <executions>--> <!-- <execution>--> <!-- <phase>package</phase>--> <!-- <goals>--> <!-- <goal>shade</goal>--> <!-- </goals>--> <!-- <configuration>--> <!-- <filters>--> <!-- <filter>--> <!-- <artifact>*:*</artifact>--> <!-- <excludes>--> <!-- <exclude>META-INF/*.SF</exclude>--> <!-- <exclude>META-INF/*.DSA</exclude>--> <!-- <exclude>META-INF/*.RSA</exclude>--> <!-- </excludes>--> <!-- </filter>--> <!-- </filters>--> <!-- <transformers>--> <!-- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">--> <!-- </transformer>--> <!-- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>--> <!-- </transformers>--> <!-- </configuration>--> <!-- </execution>--> <!-- </executions>--> <!-- </plugin>--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 可以设置jar包的入口类(可选) --> <mainClass>test.SqlJoinKafka10Test</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2、处理主程序
package org.sql.test import java.util.concurrent.TimeUnit import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{EnvironmentSettings, TableResult} import org.apache.flink.table.api.bridge.scala._ /** * @program: demo * @description: 从kafka0.10读取数据,sink到es或者kafka * @author: yang * @create: 2021-01-15 11:48 */ object SqlJoinMysqlCDCTest { def main(args: Array[String]): Unit = { //1、环境准备 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("hdfs://uat-datacenter1:8020/flink/mysql/checkpoints")) env.enableCheckpointing(5000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings) //读取mysql表数据1 val createMysqlUserTabel = """ |CREATE TABLE `user` ( | id INT, | name STRING, | create_time TIMESTAMP(3) |) WITH ( | ‘connector‘ = ‘mysql-cdc‘, | ‘hostname‘ = ‘localhost‘, | ‘port‘ = ‘3306‘, | ‘username‘ = ‘root‘, | ‘password‘ = ‘root‘, | ‘database-name‘ = ‘test‘, | ‘table-name‘ = ‘user‘ |) """.stripMargin //读取mysql表数据2 val createMysqlWeblogTabel = """ |CREATE TABLE user_info ( | id INT, | username STRING, | password STRING |) WITH ( | ‘connector‘ = ‘mysql-cdc‘, | ‘hostname‘ = ‘locahost‘, | ‘port‘ = ‘3306‘, | ‘username‘ = ‘root‘, | ‘password‘ = ‘root‘, | ‘database-name‘ = ‘test‘, | ‘table-name‘ = ‘user_info‘ |) """.stripMargin val createMysqlSinkTabel = """ |CREATE TABLE user_all ( | id INT, | name STRING, | create_time TIMESTAMP(3), | bid INT, | username STRING, | password STRING, | primary key(id) not ENFORCED |) WITH ( | ‘connector‘ = ‘jdbc‘, | ‘url‘ = ‘jdbc:mysql://localhost:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai‘, | ‘driver‘ = ‘com.mysql.cj.jdbc.Driver‘, | ‘table-name‘ = ‘user_all‘, | ‘username‘ = ‘root‘, | ‘password‘ = ‘root‘, | ‘sink.buffer-flush.max-rows‘ = ‘2‘, | ‘sink.buffer-flush.interval‘ = ‘1s‘ |) """.stripMargin val unoinSql = """ |insert into user_all | select u.id,u.name,u.create_time,ui.bid,ui.username,ui.password | from (select id,name,create_time from `user`) as u | left JOIN (select id as bid,username,password from user_info) as ui | on u.id = ui.bid """.stripMargin tableEnv.executeSql(createMysqlUserTabel) tableEnv.executeSql(createMysqlWeblogTabel) tableEnv.executeSql(createMysqlSinkTabel) val result: TableResult = tableEnv.executeSql(unoinSql) result.print() } }
3、mysql表结构
CREATE TABLE `user` ( `id` int NOT NULL, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `age` decimal(11,2) DEFAULT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `user_info` ( `id` int NOT NULL AUTO_INCREMENT COMMENT ‘主键‘, `username` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `password` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1329627045540970498 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; CREATE TABLE `user_all` ( `id` int NOT NULL, `name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, `create_time` datetime DEFAULT NULL, `bid` int DEFAULT NULL, `username` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, `password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
问题:
1、flink cdc ,目前对于checkpoint支持不好,无法从checkpoint中读取重新执行。