一、Kafka Connect
- Connect 是 Kafka 的一部分,它为在Kafka和外部数据存储系统之间移动数据提供了一种可靠且可伸缩的方式
- 它为连接器插件提供了一组API和一个运行时——Connect负责运行这些插件,它们则负责移动数据
- Connect以worker进程集群的方式运行,我们基于worker进程安装连接器插件,然后使用REST API来管理和配置 connector ,这些worker进程都是长时间持续运行的作业
-
连接器启动额外的task, 有效地利用工作节点的资源,以并行的方式移动大量的数据。数据源的连接器负责从源系统读取数据,并把数据对象提供给worker进程。数据池的连接器负责从 worker进程获取数据,并把它们写入目标系统
- Connect通过connector在Kafka里存储不同格式的数据
- Kafka支持JSON,而且Confluent Schema Registry提供了Avro转换器。开发人员可以选择数据的存储格式,这些完全独立于他们所使用的连接器
二、运行Connect
- Connect随着Kafka一起发布,所以无需单独安装。如果你打算在生产环境使用 Connect 来移动大量的数据,或者打算运行多个连接器,那么 最好把 Connect 部署在独立于 broker 的服务器上。在所有的机器上安装 Kafka,并在部分服务器上启动 broker,然后在其他服务器上启动 Connect
-
启动命令如下,需要传入一个配置文件:
bin/connect-distributed.sh config/connect-distributed.properties
三、几个重要配置参数
bootstrap.servers
- 该参数列出了将要与 Connect 协同工作的 broker 服务器,连接器将会向这些 broker 写入数据或者从它们那里读取数据。你不需要指定集群所有的 broker,不过建议至少指定3个
group.id
- 具有相同 group id 的worker属于同一个 Connect 集 群。集群的连接器和它们的任务可以运行在任意一个worker上
key.converter和value.converter
- Connect可以处理存储在 Kafka 里的不同格式的数据。这两个参数分别指定了消息的键和值 所使用的转换器。默认使用 Kafka 提供的 JSONConverter,当然也可以配置成 Confluent Schema Registry 提供的 AvroConverter
- 有些转换器还包含了特定的配置参数。例如,通过将key.converter.schema.enable设置成 true 或者 false 来指定 JSON 消息是否可以包含 schema。值转换器也有类似的配置,不过它的 参数名是 value.converter.schema.enable 。Avro 消息也包含了 schema,不过需要通过 key.converter.schema.registry.url 和 value.converter.schema.registry.url 来指定 Schema Registry 的位置
四、通过REST API操作Connect
- 我们一般通过Connect的REST API来配置和监控rest.host.name和rest.port连接器。你可以为 REST API 指定特定的端口
- 在启动worker集群之后,可以通过REST API来验证它们是否运行正常:
curl http://localhost:8083/
- 这个REST URI 应该要返回当前Connect 的版本号。我运行的是Kafka 0.10.1.0(预发行)快照版本
- 还可以检查已经安装好的连接器插件:
curl http://localhost:8083/connector-plugins
单机模式
- 要注意,Connect也支持单机模式。单机模式与分布式模式类似, 只是在启动时使用 bin/connect-standalone.sh代替bin/connect-distributed.sh,也可以通过命令行传入连接器的配置文件,这样就不需要使用REST API了
- 在单机模式下,所有的连接器和任务都运行在单独的worker进程上。单机模式使用起来更简单,特别是在开发和诊断问题的时候,或者是在需要让连接器和任务运行在某台特定机器上的时候(比如Syslog连接器会监听某个端口,所以你需要知道它运行在哪台机器上)