描述:
利用flink CDC 将读取mysql binlog 将数据从mysql抽离出来发送至kafka
1、pom.xml文件依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>FlinkSql</artifactId> <groupId>org.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>FlinkSqlTest</artifactId> <properties> <flink.version>1.11.3</flink.version> <scala.binary.version>2.11</scala.binary.version> <flink-cdc.version>1.1.0</flink-cdc.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <!-- <scope>test</scope>--> </dependency> <!-- <dependency>--> <!-- <groupId>mysql</groupId>--> <!-- <artifactId>mysql-connector-java</artifactId>--> <!-- <version>8.0.16</version>--> <!-- </dependency>--> <dependency> <groupId>com.alibaba.ververica</groupId> <!-- add the dependency matching your database --> <artifactId>flink-connector-mysql-cdc</artifactId> <version>${flink-cdc.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>${flink.version}</version> <scope> compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.11.1</version> </dependency> <!-- old planner flink table--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <!--new planner--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-format-changelog-json</artifactId> <version>1.0.0</version> </dependency> <!-- 日志相关依赖,flink必须要加,否则报错 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <configuration> <scalaCompatVersion>2.11</scalaCompatVersion> <scalaVersion>2.11.12</scalaVersion> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 可以设置jar包的入口类(可选) --> <mainClass>test.SqlJoinKafka10Test</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2、代码
package test import java.util.concurrent.TimeUnit import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.{EnvironmentSettings, TableResult} /** * @program: demo * @description: 从kafka0.10读取数据,sink到es或者kafka * @author: yang * @create: 2021-01-15 11:48 */ object SqlJoinMysqlKafkaCDCTest { def main(args: Array[String]): Unit = { //1、环境准备 val env = StreamExecutionEnvironment.getExecutionEnvironment // env.setStateBackend(new FsStateBackend("hdfs://locahost:8020/flink/mysql/checkpoints")) env.setStateBackend(new FsStateBackend("file:///D://tmp//flink/mysql")) //env.setStateBackend(new FsStateBackend("file:///root/flink1.11/flink-1.11.3/job/checkpoint")) env.enableCheckpointing(8000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings) //读取mysql表数据1 val createMysqlUserTabel = """ |CREATE TABLE `user` ( | id INT, | name STRING, | create_time TIMESTAMP(3) |) WITH ( | ‘connector‘ = ‘mysql-cdc‘, | ‘hostname‘ = ‘hostname‘, | ‘port‘ = ‘3306‘, | ‘username‘ = ‘root‘, | ‘password‘ = ‘localhost‘, | ‘database-name‘ = ‘test‘, | ‘table-name‘ = ‘user‘ |) """.stripMargin //读取mysql表数据2 val createMysqlWeblogTabel = """ |CREATE TABLE user_info ( | id INT, | username STRING, | password STRING |) WITH ( | ‘connector‘ = ‘mysql-cdc‘, | ‘hostname‘ = ‘localhost‘, | ‘port‘ = ‘3306‘, | ‘username‘ = ‘root‘, | ‘password‘ = ‘root‘, | ‘database-name‘ = ‘test‘, | ‘table-name‘ = ‘user_info‘ |) """.stripMargin val createMysqlSinkTabel = """ |CREATE TABLE user_all ( | id INT, | name STRING, | create_time TIMESTAMP(3), | bid INT, | username STRING, | password STRING, | primary key(id) not ENFORCED |) WITH ( | ‘connector‘ = ‘kafka-0.10‘, | ‘topic‘ = ‘user_all‘, | ‘scan.startup.mode‘ = ‘earliest-offset‘, | ‘properties.bootstrap.servers‘ = ‘localhost:9092‘, | ‘format‘ = ‘changelog-json‘ |) """.stripMargin val unoinSql = """ |insert into user_all | select u.id,u.name,u.create_time,ui.bid,ui.username,ui.password | from (select id,name,create_time from `user`) as u | left JOIN (select id as bid,username,password from user_info) as ui | on u.id = ui.bid """.stripMargin tableEnv.executeSql(createMysqlUserTabel) tableEnv.executeSql(createMysqlWeblogTabel) tableEnv.executeSql(createMysqlSinkTabel) val result: TableResult = tableEnv.executeSql(unoinSql) result.print() } }
3、创建topic
kafka-topics --create --zookeeper h1:2181,h2:2181,h3:2181 --replication-factor 1 --partitions 1 --topic user_all
4、消费user_all topic,并查看数据结构
kafka-console-consumer --zookeeper h1:2181,h2:2181,h3:2181 --from-beginning --topic user_all
{"data":{"id":3,"name":"1","create_time":"2021-02-23 10:55:37","bid":3,"username":"1","password":"333"},"op":"-D"}
{"data":{"id":3,"name":"name3","create_time":"2021-02-23 10:55:37","bid":3,"username":"1","password":"333"},"op":"+I"}
{"data":{"id":4,"name":"1","create_time":"2021-02-23 14:56:55","bid":4,"username":"1","password":"4"},"op":"-D"}
{"data":{"id":4,"name":"name4","create_time":"2021-02-23 14:56:55","bid":4,"username":"1","password":"4"},"op":"+I"}