cdc-Debezium

文章目录

简介

Debezium 是一组分布式服务,用于捕获数据库中的更改,以便您的应用程序可以查看这些更改并对其做出响应。Debezium 将每个数据库表中的所有行级更改记录在一个更改事件流中,应用程序只需读取这些流以查看更改事件发生的顺序。

使用左侧的链接查找 Debezium 的文档,包括各种 Debezium连接器。如果你想避开Debezium是你的头或它是如何工作的,我们建议从我们的教程和本次会议由的QCon旧金山CDC用例Debezium。

功能

Debezium 是一组用于 Apache Kafka Connect 的源连接器。每个连接器通过使用该数据库的变更数据捕获 (CDC) 功能从不同的数据库中获取变更。与其他方法(例如轮询或双重写入)不同,Debezium 实现的基于日志的 CDC:

确保捕获所有数据更改。
以极低的延迟生成更改事件,同时避免频繁轮询所需的 CPU 使用率增加。例如,对于 MySQL 或 PostgreSQL,延迟在毫秒范围内。
要求没有改变你的数据模型,如“最近更新”一栏。
可以捕获删除。
可以捕获旧记录状态和其他元数据,例如事务 ID 和导致查询,具体取决于数据库的功能和配置。
https://debezium.io/blog/2018/07/19/advantages-of-log-based-change-data-capture/

Debezium 连接器通过一系列相关功能和选项捕获数据变化:
快照:如果连接器已启动且并非所有日志仍然存在,则可以选择获取数据库当前状态的初始快照。通常,当数据库运行了一段时间并丢弃了事务恢复或复制不再需要的事务日志时,就会出现这种情况。有多种执行快照的模式。请参阅您正在使用的连接器的文档。

过滤器:您可以使用包含/排除列表过滤器配置捕获的模式、表和列的集合。
屏蔽:可以屏蔽来自特定列的值,例如,当它们包含敏感数据时。

监控:大多数连接器都可以使用 JMX 进行监控。

即用型消息转换:
消息路由
基于内容的路由
为关系连接器和MongoDB 连接器提取新记录状态

过滤
从事务发件箱表路由事件

架构

Apache Kafka Connect部署 Debezium 。

Kafka Connect 是一个用于实现和操作的框架和运行时:

将记录发送到 Kafka 的源连接器,例如 Debezium

将记录从 Kafka 主题传播到其他系统的接收器连接器

下图显示了基于 Debezium 的变更数据捕获管道的架构:

cdc-Debezium
如图所示,部署了用于 MySQL 和 PostgresSQL 的 Debezium 连接器以捕获对这两种类型数据库的更改。每个 Debezium 连接器都建立与其源数据库的连接:

MySQL 连接器使用客户端库来访问binlog.

PostgreSQL 连接器从逻辑复制流中读取。

除了 Kafka 代理之外,Kafka Connect 还作为单独的服务运行。

默认情况下,来自一个数据库表的更改将写入名称与表名称对应的 Kafka 主题。如果需要,您可以通过配置 Debezium 的主题路由转换来调整目标主题名称。例如,您可以:

将记录路由到名称与表名不同的主题

将多个表的更改事件记录流式传输到单个主题中

更改事件记录在 Apache Kafka 中后,Kafka Connect 生态系统中的不同连接器可以将记录流式传输到其他系统和数据库(例如 Elasticsearch、数据仓库和分析系统)或缓存(例如 Infinispan)。根据选择的接收器连接器,您可能需要配置 Debezium 的新记录状态提取转换。这个 Kafka Connect SMT 将after结构从 Debezium 的更改事件传播到接收器连接器。这代替了默认传播的详细更改事件记录。

Debezium 服务器

另一种部署 Debezium 的方法是使用Debezium 服务器。Debezium 服务器是一个可配置的、随时可用的应用程序,它将更改事件从源数据库流式传输到各种消息传递基础设施。

下图显示了使用 Debezium 服务器的变更数据捕获管道的架构:

cdc-Debezium
Debezium 服务器配置为使用 Debezium 源连接器之一来捕获源数据库中的更改。更改事件可以序列化为不同的格式,例如 JSON 或 Apache Avro,然后将发送到各种消息传递基础设施之一,例如 Amazon Kinesis、Google Cloud Pub/Sub 或 Apache Pulsar。

嵌入式引擎

然而,使用 Debezium 连接器的另一种方法是嵌入式引擎。在这种情况下,Debezium 不会通过 Kafka Connect 运行,而是作为嵌入到您的自定义 Java 应用程序中的库运行。这对于在您的应用程序本身内使用更改事件非常有用,而无需部署完整的 Kafka 和 Kafka Connect 集群,或者用于将更改流式传输到 Amazon Kinesis 等替代消息传递代理。你可以找到一个例子在实例库中的后者。

安装

安装 Debezium
安装和使用 Debezium 连接器的方法有多种,因此我们记录了一些最常用的方法。

安装 Debezium 连接器
如果您已经安装了Zookeeper、Kafka和Kafka Connect,那么使用 Debezium 的连接器之一很容易。只需下载一个或多个连接器插件档案(见下文),将它们的文件解压缩到您的 Kafka Connect 环境中,然后将解压缩的插件的父目录添加到 Kafka Connect 的插件路径中。如果还不是这种情况,请使用plugin.path配置属性在您的工作器配置中指定插件路径(例如connect-distributed.properties)。例如,假设您已下载 Debezium MySQL 连接器存档并将其内容提取到/kafka/connect/debezium-connector-mysql. 然后你会在工作配置中指定以下内容:

plugin.path=/kafka/connect
重新启动 Kafka Connect 进程以获取新的 JAR。

连接器插件可从 Maven 获得:

MySQL 连接器插件存档

Postgres 连接器插件存档

MongoDB 连接器插件存档

SQL Server 连接器插件存档

Oracle Connector 插件存档(孵化中)

Db2 连接器插件存档(孵化中)

Cassandra 插件存档(孵化)

Vitess 插件存档(孵化)

如果您喜欢不可变容器,那么请查看Debezium 的Apache Kafka、Kafka Connect 和 Apache Zookeeper容器映像(DockerHub 上的替代来源),其中不同的 Debezium 连接器已经预先安装并准备就绪。我们的教程甚至会引导您使用这些图像,这是了解 Debezium 的全部内容的好方法。当然你也可以在 Kubernetes 和OpenShift上运行 Debezium 。为此,建议使用Strimzi Kubernetes Operator。它允许通过自定义 Kubernetes 资源以声明方式部署 Apache Kafka、Kafka Connect 甚至连接器。

默认情况下,目录/kafka/connect被 Debezium Docker 镜像用作 Kafka Connect 的插件目录。因此,您可能希望使用的任何其他连接器都应添加到该目录中。或者,您可以通过KAFKA_CONNECT_PLUGINS_DIR在启动容器时指定环境变量(例如-e KAFKA_CONNECT_PLUGINS_DIR=/kafka/connect/,/path/to/further/plugins)向插件路径添加更多目录。在使用 Confluent 提供的 Kafka Connect 的 Docker 镜像时,您可以指定CONNECT_PLUGIN_PATH环境变量来实现相同的效果。

并不是说运行 Debezium 连接器需要 Java 8 或更高版本。

使用快照版本
Debezium 每晚在 Sonatype 快照存储库中执行构建和部署。如果您想尝试最新的或验证您感兴趣的错误修复,请使用来自oss.sonatype.org 的插件或查看本文档的主分支版本以获取指向每个连接器插件工件的直接链接。安装过程与常规版本相同。

使用 Debezium 连接器
要使用连接器为特定源服务器/集群生成更改事件,只需为MySQL 连接器、 Postgres 连接器、 MongoDB 连接器、 SQL Server 连接器、 Oracle 连接器、 Db2 连接器、 Cassandra 连接器或Vitess 连接器创建一个配置文件 ,然后使用Kafka Connect REST API将该连接器配置添加到您的 Kafka Connect 集群。当连接器启动时,它将连接到源并为每个插入、更新和删除的行或文档生成事件。

有关更多信息,请参阅 Debezium连接器文档。

配置 Debezium 主题
Debezium 使用(通过 Kafka Connect 或直接)多个主题来存储数据。这些主题必须由管理员或 Kafka 本身通过启用主题的自动创建来创建。有一些适用于主题的限制和建议:

数据库历史主题(适用于 MySQL 和 SQL Server 的 Debezium 连接器)

无限(或非常长)保留(无压缩!)

生产的复制因子至少为 3

单分区

其他主题

可选地,启用日志压缩(如果您希望只保留给定记录的最后一个更改事件);在这种情况下,应该配置 Apache Kafka 中的min.compaction.lag.ms和delete.retention.ms主题级别设置,以便消费者有足够的时间接收所有事件并删除标记;具体来说,这些值应该大于您预期的接收器连接器的最大停机时间,例如在更新它们时

在生产中复制

单分区

您可以放宽单分区规则,但您的应用程序必须处理数据库中不同行的乱序事件(单行的事件仍然是完全有序的)。如果使用多个分区,Kafka默认会通过对key进行hash来确定分区。其他分区策略需要使用 SMT 为每个记录设置分区号。

有关可自定义的主题自动创建(自 Kafka Connect 2.6.0 起可用),请参阅自定义主题自动创建

使用 Debezium 库
尽管 Debezium 旨在用作交钥匙服务,但所有 JAR 和其他工件都可以在Maven Central 中使用。

我们确实提供了一个小型库,因此应用程序可以嵌入任何 Kafka Connect 连接器并使用直接从源系统读取的数据更改事件。这提供了一个轻量级系统(因为不需要 Zookeeper、Kafka 和 Kafka Connect 服务),但因此它不是容错或可靠的,因为应用程序必须管理和维护通常保存在 Kafka 的分布式和复制日志中的所有状态. 它非常适合在测试中使用,经过仔细考虑,它可能在某些应用程序中有用。

快速开始

教程
本教程演示了如何使用 Debezium 监控 MySQL 数据库。随着数据库中的数据发生变化,您将看到生成的事件流。

在本教程中,您将启动 Debezium 服务,使用一个简单的示例数据库运行 MySQL 数据库服务器,并使用 Debezium 监视数据库的更改。

先决条件
Docker 已安装并正在运行。

本教程使用 Docker 和 Debezium Docker 镜像来运行所需的服务。您应该使用最新版本的 Docker。有关更多信息,请参阅Docker 引擎安装文档。

对于本教程,不要在虚拟机中运行 Docker(使用 Docker Machine)。

这个例子也可以使用 Podman 运行。有关更多信息,请参阅Podman

Debezium 简介
Debezium 是一个分布式平台,可将您现有的数据库转换为事件流,因此应用程序可以查看并立即响应数据库中的每个行级更改。

Debezium 构建在Apache Kafka之上,并提供与Kafka Connect兼容的连接器,用于监控特定的数据库管理系统。Debezium 在 Kafka 日志中记录数据更改的历史记录,从您的应用程序使用它们的位置。这使您的应用程序可以轻松正确且完整地使用所有事件。即使您的应用程序意外停止,它也不会错过任何内容:当应用程序重新启动时,它将继续使用它停止的事件。

Debezium 包括多个连接器。在本教程中,您将使用MySQL 连接器。

启动服务
使用 Debezium 需要三个独立的服务: ZooKeeper、Kafka 和 Debezium 连接器服务。在本教程中,您将使用Docker和Debezium Docker 映像设置每个服务的单个实例。

要启动本教程所需的服务,您必须:

启动动物园管理员

启动卡夫卡

启动 MySQL 数据库

启动 MySQL 命令行客户端

启动卡夫卡连接

使用 Docker 运行 Debezium 的注意事项
本教程使用Docker和Debezium Docker 镜像来运行 ZooKeeper、Kafka、Debezium 和 MySQL 服务。在单独的容器中运行每个服务可以简化设置,以便您可以看到 Debezium 的运行情况。

在生产环境中,您将运行每个服务的多个实例以提供性能、可靠性、复制和容错。通常,您要么将这些服务部署在OpenShift或Kubernetes 等管理多个主机和机器上运行的多个 Docker 容器的平台上,要么安装在专用硬件上。

在使用 Docker 运行 Debezium 时,您应该注意以下注意事项:

ZooKeeper 和 Kafka 的容器是短暂的。

ZooKeeper 和 Kafka 通常会将它们的数据本地存储在容器中,这需要您将目录作为卷挂载到主机上。这样,当容器停止时,持久化的数据仍然存在。但是,本教程会跳过此设置 - 当容器停止时,所有持久化数据都将丢失。这样,当您完成本教程时,清理工作就很简单了。

有关存储持久数据的更多信息,请参阅Docker 映像的文档。

本教程要求您在不同的容器中运行每个服务。

为避免混淆,您将在单独的终端中在前台运行每个容器。这样,容器的所有输出都将显示在用于运行它的终端中。

Docker 还允许您以分离模式(带有-d选项)运行容器,其中启动容器并docker立即返回命令。但是,分离模式容器不会在终端中显示其输出。要查看输出,您需要使用该命令。有关更多信息,请参阅 Docker 文档。docker logs --follow --name

启动动物园管理员
ZooKeeper 是您必须启动的第一个服务。

程序
打开终端并使用它在容器中启动 ZooKeeper。

此命令使用debezium/zookeeper镜像的1.6 版运行一个新容器:

$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.6
-it
容器是交互式的,这意味着终端的标准输入和输出都附加到容器上。

–rm
容器将在停止时被移除。

–name zookeeper
容器的名称。

-p 2181:2181 -p 2888:2888 -p 3888:3888
将容器的三个端口映射到 Docker 主机上的相同端口。这使其他容器(和容器外的应用程序)能够与 ZooKeeper 通信。

如果您使用 Podman,请运行以下命令:

$ sudo podman pod create --name=dbz --publish “9092,3306,8083”
$ sudo podman run -it --rm --name zookeeper --pod dbz debezium/zookeeper:1.6
验证 ZooKeeper 已启动并正在侦听端口2181。

您应该会看到类似于以下内容的输出:

Starting up in standalone mode
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1

port 0.0.0.0/0.0.0.0:2181
此行表示 ZooKeeper 已准备好并正在侦听端口 2181。当 ZooKeeper 生成它时,终端将继续显示其他输出。
启动卡夫卡
启动 ZooKeeper 后,您可以在新容器中启动 Kafka。

Debezium 1.6.1.Final 已经针对多个版本的 Kafka Connect 进行了测试。请参阅Debezium 测试矩阵以确定 Debezium 和 Kafka Connect 之间的兼容性。

程序
打开一个新终端并使用它在容器中启动 Kafka。

此命令使用debezium/kafka镜像的1.6 版运行一个新容器:

$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.6
-it
容器是交互式的,这意味着终端的标准输入和输出都附加到容器上。

–rm
容器将在停止时被移除。

–name kafka
容器的名称。

-p 9092:9092
9092将容器中的端口映射到 Docker 主机上的相同端口,以便容器外的应用程序可以与 Kafka 通信。

–link zookeeper:zookeeper
告诉容器它可以在容器中找到 ZooKeeper zookeeper,它运行在同一个 Docker 主机上。

如果您使用 Podman,请运行以下命令:

$ sudo podman run -it --rm --name kafka --pod dbz debezium/kafka:1.6
在本教程中,您将始终从 Docker 容器内连接到 Kafka。这些容器中的任何一个都可以kafka通过链接到容器来与容器进行通信。如果您需要从Docker 容器外部连接到 Kafka ,则必须设置-e选项以通过 Docker 主机公布 Kafka 地址(-e ADVERTISED_HOST_NAME=后跟Docker 主机的 IP 地址或可解析主机名)。

验证 Kafka 是否已启动。

您应该会看到类似于以下内容的输出:


2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
2017-09-21 07:16:59,218 - INFO [main:Logging c l a s s @ 70 ] − C l u s t e r I D = L P t c B F x z R v O z D S X h c 6 A a m A . . . 2017 − 09 − 2107 : 16 : 59 , 649 − I N F O [ m a i n : L o g g i n g class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA ... 2017-09-21 07:16:59,649 - INFO [main:Logging class@70]−ClusterID=LPtcBFxzRvOzDSXhc6AamA...2017−09−2107:16:59,649−INFO[main:Loggingclass@70] - [Kafka Server 1], started
Kafka 代理已成功启动并准备好进行客户端连接。当 Kafka 生成它时,终端将继续显示额外的输出。
启动 MySQL 数据库
此时,您已经启动了 ZooKeeper 和 Kafka,但您仍然需要一个数据库服务器,Debezium 可以从中捕获更改。在此过程中,您将使用示例数据库启动 MySQL 服务器。

程序
打开一个新终端,并使用它来启动一个新容器,该容器运行一个预先配置了inventory数据库的 MySQL 数据库服务器。

该命令运行使用的1.6版本新的容器debezium/example-mysql的图像,这是基于所述的MySQL:5.7的图像。它还定义并填充示例inventory数据库:

$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.6
-it
容器是交互式的,这意味着终端的标准输入和输出都附加到容器上。

–rm
容器将在停止时被移除。

–name mysql
容器的名称。

-p 3306:3306
3306将容器中的端口(默认 MySQL 端口)映射到 Docker 主机上的相同端口,以便容器外的应用程序可以连接到数据库服务器。

-e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw
创建具有 Debezium MySQL 连接器所需的最低权限的用户和密码。

如果您使用 Podman,请运行以下命令:

$ sudo podman run -it --rm --name mysql --pod dbz -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.6
验证 MySQL 服务器是否启动。

MySQL 服务器会随着配置的修改而启动和停止几次。您应该会看到类似于以下内容的输出:


017-09-21T07:18:50.824629Z 0 [Note] mysqld: ready for connections.
Version: ‘5.7.19-log’ socket: ‘/var/run/mysqld/mysqld.sock’ port: 3306 MySQL Community Server (GPL)
启动 MySQL 命令行客户端
启动 MySQL 后,启动 MySQL 命令行客户端,以便访问示例inventory数据库。

程序
打开一个新终端,并使用它在容器中启动 MySQL 命令行客户端。

此命令使用mysql:5.7映像运行一个新容器,并定义一个 shell 命令以使用正确的选项运行 MySQL 命令行客户端:

$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c ‘exec mysql -h" M Y S Q L P O R T 3 30 6 T C P A D D R " − P " MYSQL_PORT_3306_TCP_ADDR" -P" MYSQLP​ORT3​306T​CPA​DDR"−P"MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"’
-it
容器是交互式的,这意味着终端的标准输入和输出都附加到容器上。

–rm
容器将在停止时被移除。

–name mysqlterm
容器的名称。

–link mysql
将容器链接到mysql容器。

如果您使用 Podman,请运行以下命令:

$ sudo podman run -it --rm --name mysqlterm --pod dbz --rm mysql:5.7 sh -c ‘exec mysql -h 0.0.0.0 -uroot -pdebezium’
验证 MySQL 命令行客户端是否已启动。

您应该会看到类似于以下内容的输出:

mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 3
Server version: 5.7.17-log MySQL Community Server (GPL)

Copyright © 2000, 2016, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type ‘help;’ or ‘\h’ for help. Type ‘\c’ to clear the current input statement.

mysql>
在mysql>命令提示符下,切换到库存数据库:

mysql> use inventory;
列出数据库中的表:

mysql> show tables;
±--------------------+
| Tables_in_inventory |
±--------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
±--------------------+
6 rows in set (0.00 sec)
使用 MySQL 命令行客户端浏览数据库并查看数据库中预加载的数据。

例如:

mysql> SELECT * FROM customers;
±-----±-----------±----------±----------------------+
| id | first_name | last_name | email |
±-----±-----------±----------±----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
±-----±-----------±----------±----------------------+
4 rows in set (0.00 sec)
启动 Kafka Connect
启动 MySQL 并inventory使用 MySQL 命令行客户端连接数据库后,启动 Kafka Connect 服务。该服务公开了一个 REST API 来管理 Debezium MySQL 连接器。

程序
打开一个新终端,并使用它在容器中启动 Kafka Connect 服务。

此命令使用 1.6 版本的debezium/connect映像运行一个新容器:

$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.6
-it
容器是交互式的,这意味着终端的标准输入和输出都附加到容器上。

–rm
容器将在停止时被移除。

–name connect
容器的名称。

-p 8083:8083
8083将容器中的端口映射到 Docker 主机上的相同端口。这使得容器外的应用程序能够使用 Kafka Connect 的 REST API 来设置和管理新的容器实例。

-e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses
设置 Debezium 映像所需的环境变量。

–link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql
将容器链接到运行 ZooKeeper、Kafka 和 MySQL 服务器的容器。

如果您使用 Podman,请运行以下命令:

$ sudo podman run -it --rm --name connect --pod dbz -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.6
如果您提供–hostname命令选项,则 Kafka Connect REST API 将不会侦听localhost接口。当暴露 REST 端口时,这可能会导致问题。

如果这是一个问题,则设置环境变量REST_HOST_NAME=0.0.0.0以确保可以从所有接口访问 REST API。

验证 Kafka Connect 是否已启动并准备好接受连接。

您应该会看到类似于以下内容的输出:


2020-02-06 15:48:33,939 INFO || Kafka version: 2.4.0 [org.apache.kafka.common.utils.AppInfoParser]

2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
使用 Kafka Connect REST API 检查 Kafka Connect 服务的状态。

Kafka Connect 公开了一个 REST API 来管理 Debezium 连接器。为了与Kafka Connect服务进行通信,您可以使用该curl命令将API请求发送到Docker主机的8083端口(您在connect启动Kafka Connect时映射到容器中的8083端口)。

这些命令使用localhost. 如果您使用的是非本地 Docker 平台(例如 Docker Toolbox),请替换localhost为您的 Docker 主机的 IP 地址。

打开一个新终端并检查 Kafka Connect 服务的状态:

$ curl -H “Accept:application/json” localhost:8083/
{“version”:“2.7.0”,“commit”:“cb8625948210849f”}
响应显示 Kafka Connect 版本 2.7.0 正在运行。
检查向 Kafka Connect 注册的连接器列表:

$ curl -H “Accept:application/json” localhost:8083/connectors/
[]
目前没有在 Kafka Connect 注册连接器。
部署 MySQL 连接器
启动 Debezium 和 MySQL 服务后,您就可以部署 Debezium MySQL 连接器,以便它可以开始监视示例 MySQL 数据库 ( inventory)。

此时,您正在运行 Debezium 服务、带有示例inventory数据库的 MySQL 数据库服务器以及连接到数据库的 MySQL 命令行客户端。要部署 MySQL 连接器,您必须:

注册 MySQL 连接器以监控inventory数据库

连接器注册后,它将开始监视数据库服务器的,binlog并为发生更改的每一行生成更改事件。

观看 MySQL 连接器启动

在连接器启动时查看 Kafka Connect 日志输出可帮助您更好地了解它必须完成的每项任务,然后才能开始监视binlog.

注册连接器来监控inventory数据库
通过注册 Debezium MySQL 连接器,连接器将开始监视 MySQL 数据库服务器的binlog. 该binlog记录的所有数据库的事务(如改变各行,并更改模式)。当数据库中的一行发生更改时,Debezium 会生成一个更改事件。

通常,您可能会使用 Kafka 工具手动创建必要的主题,包括指定副本数。但是,在本教程中,Kafka 配置为仅使用一个副本自动创建主题。

程序
查看您将注册的 Debezium MySQL 连接器的配置。

在注册连接器之前,您应该熟悉其配置。在下一步中,您将注册以下连接器:

{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“tasks.max”: “1”,
“database.hostname”: “mysql”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “dbz”,
“database.server.id”: “184054”,
“database.server.name”: “dbserver1”,
“database.include.list”: “inventory”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”
}
}
连接器的名称。
连接器的配置。
任何时候只应运行一项任务。因为 MySQL 连接器读取 MySQL 服务器的binlog,所以使用单个连接器任务可确保正确的顺序和事件处理。Kafka Connect 服务使用连接器来启动一个或多个完成工作的任务,并自动将正在运行的任务分配到整个 Kafka Connect 服务集群。如果任何服务停止或崩溃,这些任务将重新分配给正在运行的服务。
数据库主机,它是运行 MySQL 服务器的 Docker 容器的名称 ( mysql)。Docker 操作容器内的网络堆栈,以便可以/etc/hosts使用容器名称作为主机名来解析每个链接的容器。如果 MySQL 在正常网络上运行,您将为该值指定 IP 地址或可解析的主机名。
唯一的服务器 ID 和名称。服务器名称是 MySQL 服务器或服务器集群的逻辑标识符。此名称将用作所有 Kafka 主题的前缀。
只会inventory检测数据库中的更改。
连接器将使用此代理(您向其发送事件的同一个代理)和主题名称在 Kafka 中存储数据库模式的历史记录。重新启动后,连接器将恢复binlog在连接器应该开始读取的时间点存在的数据库模式。
有关更多信息,请参阅MySQL 连接器配置属性。

打开一个新终端,并使用curl命令注册 Debezium MySQL 连接器。

此命令使用 Kafka Connect 服务的 API 提交POST针对/connectors资源的请求,其中包含描述新连接器(称为inventory-connector)的 JSON 文档。

此命令用于localhost连接到 Docker 主机。如果您使用的是非本地 Docker 平台,请替换localhost为您的 Docker 主机的 IP 地址。

$ curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{ “name”: “inventory-connector”, “config”: { “connector.class”: “io.debezium.connector.mysql.MySqlConnector”, “tasks.max”: “1”, “database.hostname”: “mysql”, “database.port”: “3306”, “database.user”: “debezium”, “database.password”: “dbz”, “database.server.id”: “184054”, “database.server.name”: “dbserver1”, “database.include.list”: “inventory”, “database.history.kafka.bootstrap.servers”: “kafka:9092”, “database.history.kafka.topic”: “dbhistory.inventory” } }’
Windows 用户可能需要转义双引号。例如:

$ curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{ “name”: “inventory-connector”, “config”: { “connector.class”: “io.debezium.connector.mysql.MySqlConnector”, “tasks.max”: “1”, “database.hostname”: “mysql”, “database.port”: “3306”, “database.user”: “debezium”, “database.password”: “dbz”, “database.server.id”: “184054”, “database.server.name”: “dbserver1”, “database.include.list”: “inventory”, “database.history.kafka.bootstrap.servers”: “kafka:9092”, “database.history.kafka.topic”: “dbhistory.inventory” } }’
否则,您可能会看到如下错误:

{“error_code”:500,“message”:“Unexpected character (‘n’ (code 110)): was expecting double-quote to start field name\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 4]”}
如果您使用 Podman,请运行以下命令:

$ curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{ “name”: “inventory-connector”, “config”: { “connector.class”: “io.debezium.connector.mysql.MySqlConnector”, “tasks.max”: “1”, “database.hostname”: “0.0.0.0”, “database.port”: “3306”, “database.user”: “debezium”, “database.password”: “dbz”, “database.server.id”: “184054”, “database.server.name”: “dbserver1”, “database.include.list”: “inventory”, “database.history.kafka.bootstrap.servers”: “0.0.0.0:9092”, “database.history.kafka.topic”: “dbhistory.inventory” } }’
验证inventory-connector是否包含在连接器列表中:

$ curl -H “Accept:application/json” localhost:8083/connectors/
[“inventory-connector”]
查看连接器的任务:

$ curl -i -X GET -H “Accept:application/json” localhost:8083/connectors/inventory-connector
您应该会看到类似于以下内容的响应(为便于阅读而格式化):

HTTP/1.1 200 OK
Date: Thu, 06 Feb 2020 22:12:03 GMT
Content-Type: application/json
Content-Length: 531
Server: Jetty(9.4.20.v20190813)

{
“name”: “inventory-connector”,

“tasks”: [
{
“connector”: “inventory-connector”,
“task”: 0
}
]
}
连接器正在运行单个任务 (task 0) 来完成其工作。连接器仅支持单个任务,因为 MySQL 将其所有活动记录在一个连续的binlog. 这意味着连接器只需要一个读取器即可获得所有事件的一致、有序的视图。
看着连接器启动
当您注册连接器时,它会在 Kafka Connect 容器中生成大量日志输出。通过查看此输出,您可以更好地了解连接器从创建到开始读取 MySQL 服务器的binlog.

注册inventory-connector连接器后,您可以查看 Kafka Connect 容器 ( connect) 中的日志输出以跟踪连接器的状态。

前几行显示了inventory-connector正在创建和启动的连接器 ( ):


2017-09-21 07:23:59,051 INFO || Connector inventory-connector config updated [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,550 INFO || Rebalance started [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,550 INFO || Finished stopping tasks in preparation for rebalance [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,550 INFO || (Re-)joining group 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:23:59,556 INFO || Successfully joined group 1 with generation 2 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:23:59,556 INFO || Joined group and got assignment: Assignment{error=0, leader=‘connect-1-4d60cb71-cb93-4388-8908-6f0d299a9d94’, leaderUrl=‘http://172.17.0.7:9092/’, offset=1, connectorIds=[inventory-connector], taskIds=[]} [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,557 INFO || Starting connectors and tasks using config offset 1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,557 INFO || Starting connector inventory-connector [org.apache.kafka.connect.runtime.distributed.DistributedHerder]

再往下,您应该会看到来自连接器的如下输出:


2017-09-21 07:24:01,151 INFO MySQL|dbserver1|task Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:24:01,151 INFO MySQL|dbserver1|task Kafka commitId : cb8625948210849f [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:24:01,584 INFO MySQL|dbserver1|task Found no existing offset, so preparing to perform a snapshot [io.debezium.connector.mysql.MySqlConnectorTask]
2017-09-21 07:24:01,614 INFO || Source task WorkerSourceTask{id=inventory-connector-0} finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
2017-09-21 07:24:01,615 INFO MySQL|dbserver1|snapshot Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull with user ‘debezium’ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,617 INFO MySQL|dbserver1|snapshot Snapshot is using user ‘debezium’ with these MySQL grants: [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,618 INFO MySQL|dbserver1|snapshot GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘debezium’@’%’ [io.debezium.connector.mysql.SnapshotReader]

Debezium 日志输出使用映射的诊断上下文(MDC) 在日志输出中提供特定于线程的信息,并使您更容易了解多线程 Kafka Connect 服务中发生的情况。这包括连接器类型(MySQL在上面的日志消息中)、连接器的逻辑名称(dbserver1上面)和连接器的活动(task、snapshot和binlog)。

在上面的日志输出中,前几行涉及task连接器的活动,并报告一些簿记信息(在本例中,连接器启动时没有预先偏移)。接下来的三行涉及snapshot连接器的活动,并报告正在使用debeziumMySQL 用户以及与该用户关联的 MySQL 授权启动快照。

如果连接器无法连接,或者它没有看到任何表或binlog,请检查这些授权以确保上面列出的所有那些都包括在内。

接下来,连接器报告它找到的相关 MySQL 服务器设置。最重要的一个是binlog_format,它被设置为ROW:

2017-09-21 07:24:01,618 INFO MySQL|dbserver1|snapshot MySQL server variables related to change data capture: [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_cache_size = 32768 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_checksum = CRC32 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_direct_non_transactional_updates = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_error_action = ABORT_SERVER [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_format = ROW [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_group_commit_sync_delay = 0 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_group_commit_sync_no_delay_count = 0 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_gtid_simple_recovery = ON [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_max_flush_queue_time = 0 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_order_commits = ON [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_row_image = FULL [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_rows_query_log_events = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_stmt_cache_size = 32768 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_client = utf8 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_connection = utf8 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_database = latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_filesystem = binary [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_results = utf8 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_server = latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_system = utf8 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_sets_dir = /usr/share/mysql/charsets/ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot collation_connection = utf8_general_ci [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot collation_database = latin1_swedish_ci [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot collation_server = latin1_swedish_ci [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot enforce_gtid_consistency = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot gtid_executed_compression_period = 1000 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_mode = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_next = AUTOMATIC [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_owned = [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_purged = [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot innodb_api_enable_binlog = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot innodb_locks_unsafe_for_binlog = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot innodb_version = 5.7.19 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot log_statements_unsafe_for_binlog = ON [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot max_binlog_cache_size = 18446744073709547520 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot max_binlog_size = 1073741824 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot max_binlog_stmt_cache_size = 18446744073709547520 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot protocol_version = 10 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot session_track_gtids = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot slave_type_conversions = [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot sync_binlog = 1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot system_time_zone = UTC [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot time_zone = SYSTEM [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot tls_version = TLSv1,TLSv1.1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot tx_isolation = REPEATABLE-READ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot tx_read_only = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot version = 5.7.19-log [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot version_comment = MySQL Community Server (GPL) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot version_compile_machine = x86_64 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,628 INFO MySQL|dbserver1|snapshot version_compile_os = Linux [io.debezium.connector.mysql.SnapshotReader]

接下来,连接器报告构成快照操作的九个步骤:


2017-09-21 07:24:01,628 INFO MySQL|dbserver1|snapshot Step 0: disabling autocommit and enabling repeatable read transactions [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,631 INFO MySQL|dbserver1|snapshot Step 1: start transaction with consistent snapshot [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,634 INFO MySQL|dbserver1|snapshot Step 2: flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,636 INFO MySQL|dbserver1|snapshot Step 3: read binlog position of MySQL primary server [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,638 INFO MySQL|dbserver1|snapshot using binlog ‘mysql-bin.000003’ at position ‘154’ and gtid ‘’ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,638 INFO MySQL|dbserver1|snapshot Step 4: read list of available databases [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,638 INFO MySQL|dbserver1|snapshot list of available databases is: [information_schema, inventory, mysql, performance_schema, sys] [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,639 INFO MySQL|dbserver1|snapshot Step 5: read list of available tables in each database [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including ‘inventory.customers’ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including ‘inventory.orders’ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including ‘inventory.products’ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including ‘inventory.products_on_hand’ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,642 INFO MySQL|dbserver1|snapshot ‘mysql.columns_priv’ is filtered out, discarding [io.debezium.connector.mysql.SnapshotReader]

2017-09-21 07:24:01,670 INFO MySQL|dbserver1|snapshot snapshot continuing with database(s): [inventory] [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,670 INFO MySQL|dbserver1|snapshot Step 6: generating DROP and CREATE statements to reflect current database schemas: [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,679 INFO MySQL|dbserver1|snapshot SET character_set_server=latin1, collation_server=latin1_swedish_ci; [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,724 WARN MySQL|dbserver1|task Error while fetching metadata with correlation id 1 : {dbhistory.inventory=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:01,853 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS inventory.products_on_hand [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,861 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS inventory.customers [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,864 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS inventory.orders [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,866 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS inventory.products [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,881 INFO MySQL|dbserver1|snapshot DROP DATABASE IF EXISTS inventory [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,889 INFO MySQL|dbserver1|snapshot CREATE DATABASE inventory [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,893 INFO MySQL|dbserver1|snapshot USE inventory [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,914 INFO MySQL|dbserver1|snapshot CREATE TABLE customers (
id int(11) NOT NULL AUTO_INCREMENT,
first_name varchar(255) NOT NULL,
last_name varchar(255) NOT NULL,
email varchar(255) NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY email (email)
) ENGINE=InnoDB AUTO_INCREMENT=1005 DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,932 INFO MySQL|dbserver1|snapshot CREATE TABLE orders (
order_number int(11) NOT NULL AUTO_INCREMENT,
order_date date NOT NULL,
purchaser int(11) NOT NULL,
quantity int(11) NOT NULL,
product_id int(11) NOT NULL,
PRIMARY KEY (order_number),
KEY order_customer (purchaser),
KEY ordered_product (product_id),
CONSTRAINT orders_ibfk_1 FOREIGN KEY (purchaser) REFERENCES customers (id),
CONSTRAINT orders_ibfk_2 FOREIGN KEY (product_id) REFERENCES products (id)
) ENGINE=InnoDB AUTO_INCREMENT=10005 DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,937 INFO MySQL|dbserver1|snapshot CREATE TABLE products (
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(255) NOT NULL,
description varchar(512) DEFAULT NULL,
weight float DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=110 DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,941 INFO MySQL|dbserver1|snapshot CREATE TABLE products_on_hand (
product_id int(11) NOT NULL,
quantity int(11) NOT NULL,
PRIMARY KEY (product_id),
CONSTRAINT products_on_hand_ibfk_1 FOREIGN KEY (product_id) REFERENCES products (id)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,947 INFO MySQL|dbserver1|snapshot Step 7: releasing global read lock to enable MySQL writes [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,949 INFO MySQL|dbserver1|snapshot Step 7: blocked writes to MySQL for a total of 00:00:00.312 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,950 INFO MySQL|dbserver1|snapshot Step 8: scanning contents of 4 tables while still in transaction [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,953 INFO MySQL|dbserver1|snapshot Step 8: - scanning table ‘inventory.customers’ (1 of 4 tables) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,958 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 4 rows from table ‘inventory.customers’ after 00:00:00.005 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,959 INFO MySQL|dbserver1|snapshot Step 8: - scanning table ‘inventory.orders’ (2 of 4 tables) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,014 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 4 rows from table ‘inventory.orders’ after 00:00:00.055 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,016 INFO MySQL|dbserver1|snapshot Step 8: - scanning table ‘inventory.products’ (3 of 4 tables) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,017 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 9 rows from table ‘inventory.products’ after 00:00:00.001 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,018 INFO MySQL|dbserver1|snapshot Step 8: - scanning table ‘inventory.products_on_hand’ (4 of 4 tables) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,019 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 9 rows from table ‘inventory.products_on_hand’ after 00:00:00.001 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,020 INFO MySQL|dbserver1|snapshot Step 8: scanned 26 rows in 4 tables in 00:00:00.069 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,020 INFO MySQL|dbserver1|snapshot Step 9: committing transaction [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,021 INFO MySQL|dbserver1|snapshot Completed snapshot in 00:00:00.405 [io.debezium.connector.mysql.SnapshotReader]

这些步骤中的每一个都会报告连接器正在执行的操作以执行一致的快照。例如,步骤 6 涉及create对正在捕获的表的 DDL语句进行逆向工程。步骤 7 在获取全局写锁后仅 0.3 秒释放全局写锁,步骤 8 读取每个表中的所有行并报告所用时间和找到的行数。在这种情况下,连接器仅用了 0.38 秒就完成了其一致的快照。

数据库的快照过程将花费更长的时间,但连接器输出足够的日志消息,您可以跟踪它正在处理的内容,即使表有大量行也是如此。虽然在快照过程开始时使用了排他写锁,但即使对于大型数据库,它也不应该持续很长时间。这是因为在复制任何数据之前释放锁。有关更多信息,请参阅MySQL 连接器文档。

接下来,Kafka Connect 报告一些“错误”。但是,您可以放心地忽略这些警告:这些消息仅意味着创建了新的Kafka 主题,并且 Kafka 必须为每个主题分配一个新的领导者:


2017-09-21 07:24:02,632 WARN || Error while fetching metadata with correlation id 1 : {dbserver1=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:02,775 WARN || Error while fetching metadata with correlation id 5 : {dbserver1.inventory.customers=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:02,910 WARN || Error while fetching metadata with correlation id 9 : {dbserver1.inventory.orders=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:03,045 WARN || Error while fetching metadata with correlation id 13 : {dbserver1.inventory.products=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:03,179 WARN || Error while fetching metadata with correlation id 17 : {dbserver1.inventory.products_on_hand=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]

最后,日志输出显示连接器已从其快照模式转换为持续读取 MySQL 服务器的binlog:


Sep 21, 2017 7:24:03 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:7)
2017-09-21 07:24:03,373 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at binlog file ‘mysql-bin.000003’, pos=154, skipping 0 events plus 0 rows [io.debezium.connector.mysql.BinlogReader]
2017-09-21 07:25:01,096 INFO || Finished WorkerSourceTask{id=inventory-connector-0} commitOffsets successfully in 18 ms [org.apache.kafka.connect.runtime.WorkerSourceTask]

查看更改事件
部署 Debezium MySQL 连接器后,它开始监视inventory数据库的数据更改事件。

当您观察连接器启动时,您会看到事件被写入以下带有dbserver1前缀(连接器名称)的主题:

dbserver1
写入所有 DDL 语句的模式更改主题。

dbserver1.inventory.products
捕获数据库products中表的更改事件inventory。

dbserver1.inventory.products_on_hand
捕获数据库products_on_hand中表的更改事件inventory。

dbserver1.inventory.customers
捕获数据库customers中表的更改事件inventory。

dbserver1.inventory.orders
捕获数据库orders中表的更改事件inventory。

在本教程中,您将探索该dbserver1.inventory.customers主题。在本主题中,您将看到不同类型的更改事件,以了解连接器如何捕获它们:

查看创建事件

更新数据库并查看更新事件

删除数据库中的一条记录并查看删除事件

重启Kafka Connect并更改数据库

查看创建事件
通过查看dbserver1.inventory.customers主题,您可以了解 MySQL 连接器如何捕获数据库中的创建事件inventory。在这种情况下,创建事件捕获添加到数据库中的新客户。

程序
打开一个新终端,并使用它启动watch-topic实用程序dbserver1.inventory.customers从主题开始观看主题。

该watch-topic实用程序非常简单且功能有限。应用程序不打算使用它来消费事件。在这种情况下,您将改为使用 Kafka 消费者和提供完整功能和灵活性的适用消费者库。

此命令watch-topic使用 1.6 版debezium/kafka映像在新容器中运行该实用程序:

$ docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.6 watch-topic -a -k dbserver1.inventory.customers
-a
监视自创建主题以来的所有事件。如果没有此选项,watch-topic将仅显示您开始观看后记录的事件。

-k
指定输出应包含事件的键。在这种情况下,这包含行的主键。

如果您使用 Podman,请运行以下命令:

$ sudo podman run -it --rm --name watcher --pod dbz debezium/kafka:1.6 watch-topic -a -k dbserver1.inventory.customers
该watch-topic实用程序从customers表中返回事件记录。有四个事件,表中的每一行一个。每个事件都采用 JSON 格式,因为这是您配置 Kafka Connect 服务的方式。每个事件有两个 JSON 文档:一个用于键,一个用于值。

您应该会看到类似于以下内容的输出:

Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic dbserver1.inventory.customers:
{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“id”}],“optional”:false,“name”:“dbserver1.inventory.customers.Key”},“payload”:{“id”:1001}}

此实用程序会持续关注主题,因此只要该实用程序正在运行,任何新事件都会自动出现。

对于最后一个事件,查看密钥的详细信息。

以下是最后一个事件的键的详细信息(为便于阅读而格式化):

{
“schema”:{
“type”:“struct”,
“fields”:[
{
“type”:“int32”,
“optional”:false,
“field”:“id”
}
],
“optional”:false,
“name”:“dbserver1.inventory.customers.Key”
},
“payload”:{
“id”:1004
}
}
The event has two parts: a schema and a payload. The schema contains a Kafka Connect schema describing what is in the payload. In this case, the payload is a struct named dbserver1.inventory.customers.Key that is not optional and has one required field (id of type int32).

The payload has a single id field, with a value of 1004.

By reviewing the key of the event, you can see that this event applies to the row in the inventory.customers table whose id primary key column had a value of 1004.

Review the details of the same event’s value.

本次活动的值显示,该行的创建,并描述它包含的内容(在这种情况下,id,first_name,last_name,和email插入行)。

以下是最后一个事件的值的详细信息(为便于阅读而格式化):

{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“field”: “id”
},
{
“type”: “string”,
“optional”: false,
“field”: “first_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “last_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “email”
}
],
“optional”: true,
“name”: “dbserver1.inventory.customers.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“field”: “id”
},
{
“type”: “string”,
“optional”: false,
“field”: “first_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “last_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “email”
}
],
“optional”: true,
“name”: “dbserver1.inventory.customers.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: true,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “server_id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_sec”
},
{
“type”: “string”,
“optional”: true,
“field”: “gtid”
},
{
“type”: “string”,
“optional”: false,
“field”: “file”
},
{
“type”: “int64”,
“optional”: false,
“field”: “pos”
},
{
“type”: “int32”,
“optional”: false,
“field”: “row”
},
{
“type”: “boolean”,
“optional”: true,
“field”: “snapshot”
},
{
“type”: “int64”,
“optional”: true,
“field”: “thread”
},
{
“type”: “string”,
“optional”: true,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “table”
}
],
“optional”: false,
“name”: “io.debezium.connector.mysql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
}
],
“optional”: false,
“name”: “dbserver1.inventory.customers.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 1004,
“first_name”: “Anne”,
“last_name”: “Kretchmar”,
“email”: “annek@noanswer.org”
},
“source”: {
“version”: “1.6.1.Final”,
“name”: “dbserver1”,
“server_id”: 0,
“ts_sec”: 0,
“gtid”: null,
“file”: “mysql-bin.000003”,
“pos”: 154,
“row”: 0,
“snapshot”: true,
“thread”: null,
“db”: “inventory”,
“table”: “customers”
},
“op”: “c”,
“ts_ms”: 1486500577691
}
}
事件的这一部分要长得多,但与事件的key 一样,它也有 aschema和 a payload。该schema包含一个名为卡夫卡连接架构dbserver1.inventory.customers.Envelope(第1版),它可以包含五个领域:

op
包含描述操作类型的字符串值的必填字段。MySQL 连接器的值用于c创建(或插入)、u更新、d删除和r读取(在非初始快照的情况下)。

before
一个可选字段,如果存在,则包含事件发生前行的状态。该结构将由dbserver1.inventory.customers.ValueKafka Connect 模式描述,连接dbserver1器将其用于inventory.customers表中的所有行。

after
可选字段,如果存在,则包含事件发生后行的状态。的结构由相同的描述dbserver1.inventory.customers.Value中使用的卡夫卡连接架构before。

source
包含描述事件源元数据的结构的必填字段,在 MySQL 的情况下,包含多个字段:连接器名称、binlog记录事件的文件名、事件在该binlog文件中出现的位置,事件中的行(如果有多个),受影响的数据库和表的名称,进行更改的 MySQL 线程 ID,此事件是否是快照的一部分,以及 MySQL 服务器(如果可用) ID,以及以秒为单位的时间戳。

ts_ms
一个可选字段,如果存在,则包含连接器处理事件的时间(使用运行 Kafka Connect 任务的 JVM 中的系统时钟)。

事件的 JSON 表示比它们描述的行长得多。这是因为,对于每个事件键和值,Kafka Connect 都提供了描述有效负载的模式。随着时间的推移,这种结构可能会发生变化。然而,在事件本身中拥有键和值的模式使得消费应用程序更容易理解消息,尤其是当它们随着时间的推移而演变时。

Debezium MySQL 连接器根据数据库表的结构构建这些模式。如果您使用 DDL 语句更改 MySQL 数据库中的表定义,则连接器会读取这些 DDL 语句并更新其 Kafka Connect 架构。这是每个事件的结构与事件发生时起源的表完全相同的唯一方式。但是,包含单个表的所有事件的 Kafka 主题可能具有对应于表定义的每个状态的事件。

JSON 转换器在每条消息中都包含键和值模式,因此它会产生非常冗长的事件。或者,您可以使用Apache Avro作为序列化格式,从而生成更小的事件消息。这是因为它将每个 Kafka Connect 模式转换为 Avro 模式,并将 Avro 模式存储在单独的 Schema Registry 服务中。因此,当 Avro 转换器序列化事件消息时,它仅放置模式的唯一标识符以及值的 Avro 编码二进制表示。因此,通过网络传输并存储在 Kafka 中的序列化消息比您在此处看到的要小得多。事实上,Avro 转换器能够使用 Avro 模式演化技术来维护模式注册表中每个模式的历史记录。

将事件的键和值模式与inventory数据库的状态进行比较。在运行 MySQL 命令行客户端的终端中,运行以下语句:

mysql> SELECT * FROM customers;
±-----±-----------±----------±----------------------+
| id | first_name | last_name | email |
±-----±-----------±----------±----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
±-----±-----------±----------±----------------------+
4 rows in set (0.00 sec)
这表明您查看的事件记录与数据库中的记录匹配。

更新数据库并查看更新事件
现在您已经了解了 Debezium MySQL 连接器如何捕获数据库中的创建事件,inventory现在您将更改其中一条记录并查看连接器如何捕获它。

通过完成此过程,您将了解如何查找有关数据库提交中更改内容的详细信息,以及如何比较更改事件以确定与其他更改相关的更改何时发生。

程序
在运行 MySQL 命令行客户端的终端中,运行以下语句:

mysql> UPDATE customers SET first_name=‘Anne Marie’ WHERE id=1004;
Query OK, 1 row affected (0.05 sec)
Rows matched: 1 Changed: 1 Warnings: 0
查看更新的customers表:

mysql> SELECT * FROM customers;
±-----±-----------±----------±----------------------+
| id | first_name | last_name | email |
±-----±-----------±----------±----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne Marie | Kretchmar | annek@noanswer.org |
±-----±-----------±----------±----------------------+
4 rows in set (0.00 sec)
切换到正在运行的终端watch-topic以查看新的第五个事件。

通过更改customers表中的记录,Debezium MySQL 连接器生成了一个新事件。您应该会看到两个新的 JSON 文档:一个用于事件的key,另一个用于新事件的value。

下面是细节关键的更新事件(格式化的可读性):

{
“schema”: {
“type”: “struct”,
“name”: “dbserver1.inventory.customers.Key”
“optional”: false,
“fields”: [
{
“field”: “id”,
“type”: “int32”,
“optional”: false
}
]
},
“payload”: {
“id”: 1004
}
}
此键与之前事件的键相同。

这是新事件的value。该部分没有任何变化schema,因此仅显示该payload部分(格式化以方便阅读):

{
“schema”: {…},
“payload”: {
“before”: {
“id”: 1004,
“first_name”: “Anne”,
“last_name”: “Kretchmar”,
“email”: “annek@noanswer.org”
},
“after”: {
“id”: 1004,
“first_name”: “Anne Marie”,
“last_name”: “Kretchmar”,
“email”: “annek@noanswer.org”
},
“source”: {
“name”: “1.6.1.Final”,
“name”: “dbserver1”,
“server_id”: 223344,
“ts_sec”: 1486501486,
“gtid”: null,
“file”: “mysql-bin.000003”,
“pos”: 364,
“row”: 0,
“snapshot”: null,
“thread”: 3,
“db”: “inventory”,
“table”: “customers”
},
“op”: “u”,
“ts_ms”: 1486501486308
}
}
该before字段现在具有包含数据库提交之前的值的行的状态。
该after字段现在具有行的更新状态,first_name值现在是Anne Marie。
该source场结构有许多像以前一样的价值观,除了ts_sec和pos领域发生了变化(在file可能在其他情况下发生了变化)。
该op字段的值是现在u,表示此行被更改,因为更新的。
该ts_ms字段显示 Debezium 处理此事件的时间戳。
通过查看该payload部分,您可以了解有关更新事件的几个重要信息:

通过比较before和after结构,您可以确定由于提交而在受影响行中实际更改的内容。

通过查看source结构,您可以找到有关 MySQL 更改记录的信息(提供可追溯性)。

通过将payload事件的部分与同一主题(或不同主题)中的其他事件进行比较,您可以确定该事件是在与另一个事件相同的 MySQL 提交之前、之后还是作为其一部分发生的。

删除数据库中的一条记录并查看删除事件
现在您已经了解了 Debezium MySQL 连接器如何捕获数据库中的创建和更新事件,inventory现在您将删除其中一条记录并查看连接器如何捕获它。

通过完成此过程,您将了解如何查找有关删除事件的详细信息,以及 Kafka 如何使用日志压缩来减少删除事件的数量,同时仍使消费者能够获取所有事件。

程序
在运行 MySQL 命令行客户端的终端中,运行以下语句:

mysql> DELETE FROM customers WHERE id=1004;
Query OK, 1 row affected (0.00 sec)
如果上述命令因违反外键约束而失败,则必须使用以下语句从地址表中删除客户地址的引用:

mysql> DELETE FROM addresses WHERE customer_id=1004;
切换到运行的终端watch-topic以查看两个新事件。

通过删除customers表中的一行,Debezium MySQL 连接器生成了两个新事件。

查看第一个新事件的键和值。

以下是第一个新事件的密钥的详细信息(为了便于阅读而格式化):

{
“schema”: {
“type”: “struct”,
“name”: “dbserver1.inventory.customers.Key”
“optional”: false,
“fields”: [
{
“field”: “id”,
“type”: “int32”,
“optional”: false
}
]
},
“payload”: {
“id”: 1004
}
}
此键与您查看的前两个事件中的键相同。

这是第一个新事件的值(为了可读性而格式化):

{
“schema”: {…},
“payload”: {
“before”: {
“id”: 1004,
“first_name”: “Anne Marie”,
“last_name”: “Kretchmar”,
“email”: “annek@noanswer.org”
},
“after”: null,
“source”: {
“name”: “1.6.1.Final”,
“name”: “dbserver1”,
“server_id”: 223344,
“ts_sec”: 1486501558,
“gtid”: null,
“file”: “mysql-bin.000003”,
“pos”: 725,
“row”: 0,
“snapshot”: null,
“thread”: 3,
“db”: “inventory”,
“table”: “customers”
},
“op”: “d”,
“ts_ms”: 1486501558315
}
}
该before字段现在具有随数据库提交删除的行的状态。
该after字段是null因为该行不再存在。
该source场结构具有许多相同的价值观和以前一样,除了ts_sec和pos领域发生了变化(在file可能在其他情况下发生了变化)。
该op字段的值是现在d,这标志着该行已被删除。
该ts_ms字段显示 Debezium 处理此事件的时间戳。
因此,此事件为使用者提供了处理行删除所需的信息。还提供了旧值,因为一些消费者可能要求他们正确处理删除。

查看第二个新事件的键和值。

这是第二个新事件的关键(为了可读性而格式化):

{
“schema”: {
“type”: “struct”,
“name”: “dbserver1.inventory.customers.Key”
“optional”: false,
“fields”: [
{
“field”: “id”,
“type”: “int32”,
“optional”: false
}
]
},
“payload”: {
“id”: 1004
}
}
再一次,此键与您查看的前三个事件中的键完全相同。

这是同一个事件的值(为了可读性而格式化):

{
“schema”: null,
“payload”: null
}
如果 Kafka 设置为日志压缩,如果主题后面至少有一条具有相同键的消息,它将从主题中删除旧消息。最后一个事件称为墓碑事件,因为它有一个键和一个空值。这意味着 Kafka 将删除所有具有相同密钥的先前消息。即使之前的消息将被删除,墓碑事件也意味着消费者仍然可以从头开始阅读主题,不会错过任何事件。

重启Kafka Connect服务
现在您已经了解了 Debezium MySQL 连接器如何捕获create、update和delete事件,您现在将了解它如何在未运行时捕获更改事件。

Kafka Connect 服务会自动管理其注册连接器的任务。因此,如果它脱机,当它重新启动时,它将启动任何非运行任务。这意味着即使 Debezium 没有运行,它仍然可以报告数据库中的更改。

在此过程中,您将停止 Kafka Connect,更改数据库中的一些数据,然后重新启动 Kafka Connect 以查看更改事件。

程序
打开一个新终端并使用它来停止connect运行 Kafka Connect 服务的容器:

$ docker stop connect
该connect容器被停止,卡夫卡Connect服务正常关闭。

因为您使用该–rm选项运行了容器,所以一旦容器停止,Docker 就会将其删除。

当服务关闭时,切换到 MySQL 命令行客户端的终端,并添加一些记录:

mysql> INSERT INTO customers VALUES (default, “Sarah”, “Thompson”, “kitt@acme.com”);
mysql> INSERT INTO customers VALUES (default, “Kenneth”, “Anderson”, “kander@acme.com”);
记录被添加到数据库中。但是,由于 Kafka Connect 未运行,因此 watch-topic不会记录任何更新。

在生产系统中,您将有足够的代理来处理生产者和消费者,并为每个主题维护最少数量的同步副本。因此,如果有足够多的代理失败,以至于不再有最小数量的 ISR,Kafka 将变得不可用。在这种情况下,生产者(如 Debezium 连接器)和消费者将等待 Kafka 集群或网络恢复。这意味着,当数据库中的数据发生更改时,您的使用者可能暂时看不到任何更改事件。这是因为没有产生更改事件。一旦 Kafka 集群重新启动或网络恢复,Debezium 将继续生产更改事件,而您的消费者将在他们停止的地方继续消费事件。

打开一个新终端,并使用它在容器中重新启动 Kafka Connect 服务。

此命令使用您最初启动它时使用的相同选项启动 Kafka Connect:

$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.6
Kafka Connect 服务启动,连接到 Kafka,读取前一个服务的配置,并启动注册的连接器,这些连接器将从上次停止的地方继续。

这是这个重新启动的服务的最后几行:


2017-09-21 07:38:48,385 INFO MySQL|dbserver1|task Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:38:48,386 INFO MySQL|dbserver1|task Kafka commitId : cb8625948210849f [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:38:48,390 INFO MySQL|dbserver1|task Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group inventory-connector-dbhistory. [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,390 INFO MySQL|dbserver1|task Revoking previously assigned partitions [] for group inventory-connector-dbhistory [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2017-09-21 07:38:48,391 INFO MySQL|dbserver1|task (Re-)joining group inventory-connector-dbhistory [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,402 INFO MySQL|dbserver1|task Successfully joined group inventory-connector-dbhistory with generation 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,403 INFO MySQL|dbserver1|task Setting newly assigned partitions [dbhistory.inventory-0] for group inventory-connector-dbhistory [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2017-09-21 07:38:48,888 INFO MySQL|dbserver1|task Step 0: Get all known binlogs from MySQL [io.debezium.connector.mysql.MySqlConnectorTask]
2017-09-21 07:38:48,903 INFO MySQL|dbserver1|task MySQL has the binlog file ‘mysql-bin.000003’ required by the connector [io.debezium.connector.mysql.MySqlConnectorTask]
Sep 21, 2017 7:38:49 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:10)
2017-09-21 07:38:49,045 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at binlog file ‘mysql-bin.000003’, pos=154, skipping 0 events plus 0 rows [io.debezium.connector.mysql.BinlogReader]
2017-09-21 07:38:49,046 INFO || Source task WorkerSourceTask{id=inventory-connector-0} finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
这些行表明服务在关闭之前找到了上一个任务先前记录的偏移量,连接到 MySQL 数据库,binlog从该位置开始读取,并从该时间点以来 MySQL 数据库中的任何更改生成事件。

切换到正在运行的终端watch-topic以查看您在 Kafka Connect 离线时创建的两条新记录的事件:

{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“id”}],“optional”:false,“name”:“dbserver1.inventory.customers.Key”},“payload”:{“id”:1005}} {“schema”:{“type”:“struct”,“fields”:[{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“id”},{“type”:“string”,“optional”:false,“field”:“first_name”},{“type”:“string”,“optional”:false,“field”:“last_name”},{“type”:“string”,“optional”:false,“field”:“email”}],“optional”:true,“name”:“dbserver1.inventory.customers.Value”,“field”:“before”},{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“id”},{“type”:“string”,“optional”:false,“field”:“first_name”},{“type”:“string”,“optional”:false,“field”:“last_name”},{“type”:“string”,“optional”:false,“field”:“email”}],“optional”:true,“name”:“dbserver1.inventory.customers.Value”,“field”:“after”},{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:true,“field”:“version”},{“type”:“string”,“optional”:false,“field”:“name”},{“type”:“int64”,“optional”:false,“field”:“server_id”},{“type”:“int64”,“optional”:false,“field”:“ts_sec”},{“type”:“string”,“optional”:true,“field”:“gtid”},{“type”:“string”,“optional”:false,“field”:“file”},{“type”:“int64”,“optional”:false,“field”:“pos”},{“type”:“int32”,“optional”:false,“field”:“row”},{“type”:“boolean”,“optional”:true,“field”:“snapshot”},{“type”:“int64”,“optional”:true,“field”:“thread”},{“type”:“string”,“optional”:true,“field”:“db”},{“type”:“string”,“optional”:true,“field”:“table”}],“optional”:false,“name”:“io.debezium.connector.mysql.Source”,“field”:“source”},{“type”:“string”,“optional”:false,“field”:“op”},{“type”:“int64”,“optional”:true,“field”:“ts_ms”}],“optional”:false,“name”:“dbserver1.inventory.customers.Envelope”,“version”:1},“payload”:{“before”:null,“after”:{“id”:1005,“first_name”:“Sarah”,“last_name”:“Thompson”,“email”:“kitt@acme.com”},“source”:{“version”:“1.6.1.Final”,“name”:“dbserver1”,“server_id”:223344,“ts_sec”:1490635153,“gtid”:null,“file”:“mysql-bin.000003”,“pos”:1046,“row”:0,“snapshot”:null,“thread”:3,“db”:“inventory”,“table”:“customers”},“op”:“c”,“ts_ms”:1490635181455}}
{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“id”}],“optional”:false,“name”:“dbserver1.inventory.customers.Key”},“payload”:{“id”:1006}} {“schema”:{“type”:“struct”,“fields”:[{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“id”},{“type”:“string”,“optional”:false,“field”:“first_name”},{“type”:“string”,“optional”:false,“field”:“last_name”},{“type”:“string”,“optional”:false,“field”:“email”}],“optional”:true,“name”:“dbserver1.inventory.customers.Value”,“field”:“before”},{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“id”},{“type”:“string”,“optional”:false,“field”:“first_name”},{“type”:“string”,“optional”:false,“field”:“last_name”},{“type”:“string”,“optional”:false,“field”:“email”}],“optional”:true,“name”:“dbserver1.inventory.customers.Value”,“field”:“after”},{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:true,“field”:“version”},{“type”:“string”,“optional”:false,“field”:“name”},{“type”:“int64”,“optional”:false,“field”:“server_id”},{“type”:“int64”,“optional”:false,“field”:“ts_sec”},{“type”:“string”,“optional”:true,“field”:“gtid”},{“type”:“string”,“optional”:false,“field”:“file”},{“type”:“int64”,“optional”:false,“field”:“pos”},{“type”:“int32”,“optional”:false,“field”:“row”},{“type”:“boolean”,“optional”:true,“field”:“snapshot”},{“type”:“int64”,“optional”:true,“field”:“thread”},{“type”:“string”,“optional”:true,“field”:“db”},{“type”:“string”,“optional”:true,“field”:“table”}],“optional”:false,“name”:“io.debezium.connector.mysql.Source”,“field”:“source”},{“type”:“string”,“optional”:false,“field”:“op”},{“type”:“int64”,“optional”:true,“field”:“ts_ms”}],“optional”:false,“name”:“dbserver1.inventory.customers.Envelope”,“version”:1},“payload”:{“before”:null,“after”:{“id”:1006,“first_name”:“Kenneth”,“last_name”:“Anderson”,“email”:“kander@acme.com”},“source”:{“version”:“1.6.1.Final”,“name”:“dbserver1”,“server_id”:223344,“ts_sec”:1490635160,“gtid”:null,“file”:“mysql-bin.000003”,“pos”:1356,“row”:0,“snapshot”:null,“thread”:3,“db”:“inventory”,“table”:“customers”},“op”:“c”,“ts_ms”:1490635181456}}
这些事件是与您之前看到的类似的创建事件。如您所见,Debezium 仍会报告数据库中的所有更改,即使它没有运行(只要在 MySQL 数据库从其binlog丢失的提交中清除之前重新启动它)。

打扫干净
完成本教程后,您可以使用 Docker 停止所有正在运行的容器。

程序
停止每个容器:

$ docker stop mysqlterm watcher connect mysql kafka zookeeper
Docker 停止每个容器。因为您–rm在启动它们时使用了该选项,所以 Docker 也会删除它们。

如果您使用 Podman,请运行以下命令:

$ sudo podman pod kill dbz
$ sudo podman pod rm dbz
验证所有进程是否已停止并已被删除:

$ docker ps -a
如果任何进程仍在运行,请使用或停止它们。docker stop docker stop

参考资料

https://debezium.io/releases/1.6/

https://github.com/debezium/debezium-examples/tree/master/kinesis

上一篇:程序的测量和计时


下一篇:mongoDB管道--数组查询