概念
Kafka Connect 是一款可扩展并且可靠地在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以很简单的定义 connectors(连接器) 将大量数据迁入、迁出Kafka。
逻辑图
Kafka Connect 特性如下:
- Kafka 连接器的通用框架:Kafka Connect 标准化了其他数据系统与Kafka的集成,从而简化了连接器的开发,部署和管理
- 支持分布式模式和单机模式部署
- Rest API:通过简单的Rest API管理连接器
- 偏移量管理:针对Source和Sink都有相应的偏移量(Offset)管理方案,程序员无须关心Offset 的提交
- 分布式模式可扩展的,支持故障转移
Connectors
连接器,分为两种 Source(从源数据库拉取pull数据写入Kafka),Sink(从Kafka消费数据写入push目标数据)
连接器其实并不参与实际的数据copy,连接器负责管理Task。连接器中定义了对应Task的类型,对外提供配置选项(用户创建连接器时需要提供对应的配置信息)。并且连接器还可以决定启动多少个Task线程。
用户可以通过Rest API 启停连接器,查看连接器状态
Confluent 已经提供了许多成熟的连接器,官网地址:Confluent Connector Portfolio
Task
实际进行数据传输的单元,和连接器一样同样分为 Source和Sink
Task的配置和状态存储在Kafka的Topic中,config.storage.topic
和status.storage.topic
。我们可以随时启动,停止任务,以提供弹性、可扩展的数据管道
Worker
刚刚我们讲的Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程,Worker 分为两种模式,单机模式和分布式模式
单机模式:比较简单,但是功能也受限,只有一些特殊的场景会使用到,例如收集主机的日志,通常来说更多的是使用分布式模式
分布式模式:为Kafka Connect提供了可扩展和故障转移。相同group.id
的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,集群会自动协调分配所有的Connector 和 Task(这个过程称为Rebalance)
当使用Worker集群时,创建连接器,或者连接器Task数量变动时,都会触发Rebalance 以保证集群各个Worker节点负载均衡。但是当Task 进入Fail状态的时候并不会触发 Rebalance,只能通过Rest Api 对Task进行重启。
Converters
Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换
默认支持以下Converter
-
AvroConverter
io.confluent.connect.avro.AvroConverter
: 需要使用 Schema Registry -
ProtobufConverter
io.confluent.connect.protobuf.ProtobufConverter
: 需要使用 Schema Registry -
JsonSchemaConverter
io.confluent.connect.json.JsonSchemaConverter
: 需要使用 Schema Registry -
JsonConverter
org.apache.kafka.connect.json.JsonConverter
(无需 Schema Registry): 转换为json结构 -
StringConverter
org.apache.kafka.connect.storage.StringConverter
: 简单的字符串格式 -
ByteArrayConverter
org.apache.kafka.connect.converters.ByteArrayConverter
: 不做任何转换
Converters 与 Connector 是解耦的,下图展示了在Kafka Connect中,Converter 在何时进行数据转换
Transforms
连接器可以通过配置Transform 实现对单个消息(对应代码中的Record)的转换和修改,可以配置多个Transform 组成一个链。例如让所有消息的topic加一个前缀、sink无法消费source 写入的数据格式,这些场景都可以使用Transform 解决
Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行。
快速上手
使用步骤:
1. 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
2.启动kafka
bin/kafka-server-start.sh config/server.properties
3.准备连接器,这里我是自己写了一个简单的连接器