前言:
CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等,
用户可以在以下的场景下使用CDC:
使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。
可以在源数据库上实时的物化一个聚合视图
因为只是增量同步,所以可以实时的低延迟的同步数据
使用EventTime join 一个temporal表以便可以获取准确的结果
flink 1.11 将这些changelog提取并转化为table apa和sql,目前支持两种格式:Debezium和Canal,这就意味着源表不仅仅是append操作,而且还有upsert、delete操作。
一 创建项目
二 导入pom文件
<properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.4.1</spring-boot.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.4.1</version> <configuration> <mainClass>com.lexue.gmall_logger.GmallLoggerApplication</mainClass> </configuration> <executions> <execution> <id>repackage</id> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
三 创建LoggerController.class
package com.lexue.gmall_logger.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class LoggerController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("applog") public String getLogger(@RequestParam("param") String logStr){ //落盘 log.info(logStr); //写入Kafka kafkaTemplate.send("ods_base_log",logStr); return "success"; }
//本地浏览器测试使用 @RequestMapping("test1") public String getLogger(@RequestParam("name") String name, @RequestParam("age") int age){ System.out.println(name + ":" + age); return "success"; } }
四 在resources目录下创建logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="/opt/module/lxz_file/logs" /> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/app.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <!-- 将某一个包下日志单独打印日志 这里需要根据读者实际类名填写 --> <logger name="com.lexue.gmall_logger.controller.LoggerController" level="INFO" additivity="false"> <appender-ref ref="rollingFile" /> <appender-ref ref="console" /> </logger> <root level="error" additivity="false"> <appender-ref ref="console" /> </root> </configuration>
五 在reousces创建application.properties文件
# 项目名称 spring.application.name=gmall-logger # 指定使用的端口号 server.port=8081 #============== kafka =================== # 指定Kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=hadoop1:9092 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
注意:项目中的test目录下有个test.class,删除此文件,不然会报错.
六 测试
1.本地IDEA开启客户端
2.服务器开启zk,kafka集群
3.开启Kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic ods_base_log
4.发送数据查看Kafka消费者是否消费
数据正在发送,查看Kafka是否成功消费到
Kafka成功消费
去IDEA看看客户端是否有数据进来
客户端成功抓取到数据
此时,生产数据已经可以成功通过FinkCDC程序顺利写入Kafka主题ods_base_log中.