FlinkCDC Mysql到Kafka

描述:

利用flink CDC 将读取mysql binlog 将数据从mysql抽离出来发送至kafka

1、pom.xml文件依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>FlinkSql</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>FlinkSqlTest</artifactId>
    <properties>
        <flink.version>1.11.3</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <flink-cdc.version>1.1.0</flink-cdc.version>
    </properties>
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.25</version>
                <!--        <scope>test</scope>-->
            </dependency>

            <!--                <dependency>-->
            <!--                    <groupId>mysql</groupId>-->
            <!--                    <artifactId>mysql-connector-java</artifactId>-->
            <!--                    <version>8.0.16</version>-->
            <!--                </dependency>-->

            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <!-- add the dependency matching your database -->
                <artifactId>flink-connector-mysql-cdc</artifactId>
                <version>${flink-cdc.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
                <version>${flink.version}</version>
                <scope> compile</scope>
            </dependency>

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <!--            <scope>provided</scope>-->
            </dependency>

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>1.11.1</version>
            </dependency>
            <!-- old planner flink table-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_2.11</artifactId>
                <version>${flink.version}</version>
                <!--            <scope>provided</scope>-->
            </dependency>
            <!--new planner-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_2.11</artifactId>
                <version>${flink.version}</version>
                <!--            <scope>provided</scope>-->
            </dependency>

            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>flink-format-changelog-json</artifactId>
                <version>1.0.0</version>
            </dependency>

            <!-- 日志相关依赖,flink必须要加,否则报错 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>

            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
            </dependency>
        </dependencies>


    <build>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!-- scala编译插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.1.6</version>
                <configuration>
                    <scalaCompatVersion>2.11</scalaCompatVersion>
                    <scalaVersion>2.11.12</scalaVersion>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 可以设置jar包的入口类(可选) -->
                            <mainClass>test.SqlJoinKafka10Test</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

2、代码

package test

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{EnvironmentSettings, TableResult}

/**
 * @program: demo
 * @description: 从kafka0.10读取数据,sink到es或者kafka
 * @author: yang
 * @create: 2021-01-15 11:48
 */
object SqlJoinMysqlKafkaCDCTest {

  def main(args: Array[String]): Unit = {
    //1、环境准备
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 //   env.setStateBackend(new FsStateBackend("hdfs://locahost:8020/flink/mysql/checkpoints"))
    env.setStateBackend(new FsStateBackend("file:///D://tmp//flink/mysql"))
    //env.setStateBackend(new FsStateBackend("file:///root/flink1.11/flink-1.11.3/job/checkpoint"))
    env.enableCheckpointing(8000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)))
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env, settings)
    //读取mysql表数据1
    val createMysqlUserTabel =
      """
        |CREATE TABLE `user` (
        |    id INT,
        |    name STRING,
        |    create_time TIMESTAMP(3)
        |) WITH (
        |    ‘connector‘ = ‘mysql-cdc‘,
        |    ‘hostname‘ = ‘hostname‘,
        |    ‘port‘ = ‘3306‘,
        |    ‘username‘ = ‘root‘,
        |    ‘password‘ = ‘localhost‘,
        |    ‘database-name‘ = ‘test‘,
        |    ‘table-name‘ = ‘user‘
        |)
    """.stripMargin


    //读取mysql表数据2
    val createMysqlWeblogTabel =
      """
        |CREATE TABLE user_info (
        |    id INT,
        |    username STRING,
        |    password STRING
        |) WITH (
        |    ‘connector‘ = ‘mysql-cdc‘,
        |    ‘hostname‘ = ‘localhost‘,
        |    ‘port‘ = ‘3306‘,
        |    ‘username‘ = ‘root‘,
        |    ‘password‘ = ‘root‘,
        |    ‘database-name‘ = ‘test‘,
        |    ‘table-name‘ = ‘user_info‘
        |)
    """.stripMargin

    val createMysqlSinkTabel =
      """
        |CREATE TABLE user_all (
        |    id INT,
        |    name STRING,
        |    create_time TIMESTAMP(3),
        |    bid INT,
        |    username STRING,
        |    password STRING,
        |    primary key(id) not ENFORCED
        |) WITH (
        |   ‘connector‘ = ‘kafka-0.10‘,
        |   ‘topic‘ = ‘user_all‘,
        |   ‘scan.startup.mode‘ = ‘earliest-offset‘,
        |   ‘properties.bootstrap.servers‘ = ‘localhost:9092‘,
        |   ‘format‘ = ‘changelog-json‘
        |)
    """.stripMargin

    val unoinSql =
      """
        |insert into user_all
        |    select u.id,u.name,u.create_time,ui.bid,ui.username,ui.password
        |    from (select id,name,create_time from `user`) as u
        |    left JOIN (select id as bid,username,password from user_info) as ui
        |    on u.id = ui.bid
      """.stripMargin

    tableEnv.executeSql(createMysqlUserTabel)
    tableEnv.executeSql(createMysqlWeblogTabel)
    tableEnv.executeSql(createMysqlSinkTabel)

    val result: TableResult = tableEnv.executeSql(unoinSql)
    result.print()

  }
}

3、创建topic

kafka-topics --create --zookeeper h1:2181,h2:2181,h3:2181 --replication-factor 1 --partitions 1 --topic user_all

4、消费user_all topic,并查看数据结构

kafka-console-consumer --zookeeper h1:2181,h2:2181,h3:2181 --from-beginning --topic user_all
{"data":{"id":3,"name":"1","create_time":"2021-02-23 10:55:37","bid":3,"username":"1","password":"333"},"op":"-D"}
{"data":{"id":3,"name":"name3","create_time":"2021-02-23 10:55:37","bid":3,"username":"1","password":"333"},"op":"+I"}
{"data":{"id":4,"name":"1","create_time":"2021-02-23 14:56:55","bid":4,"username":"1","password":"4"},"op":"-D"}
{"data":{"id":4,"name":"name4","create_time":"2021-02-23 14:56:55","bid":4,"username":"1","password":"4"},"op":"+I"}

 

FlinkCDC Mysql到Kafka

上一篇:Mysql之Mycat读写分离及分库分表


下一篇:PLSQL 12.0.6注册码