提到消息中间件,Kafka是我们经常提及的,不过消息中间件的属性只能说是它的基础属性之一,如果你去看官网的简介,会称其为分布式事件流平台,事件流的处理才是它最核心的特点;在我看来,Kafka应该是目前性能最高的开源的事件流平台了,很多开发者可能只是使用了Kafka的Produce和Consume接口来进行消息流的应用,其实它的Connect和Stream API是真正能体现Kafka在流处理上的优势的特性。
Kafka的历史
很多时候优秀的开源项目并不是作者突发奇想地写出来的,而是由实际的业务需求推动后逐步完善的。按照*的介绍,在Kafka正式成为一个开源项目前,它的原型来自LinkedIn公司内部的数据中间件系统(大概在2011年),当时开发该系统的工程师们感觉它的设计理念可以适用于很多业务场景,于是重构之后正式开源化。其中一个工程师Jay Kreps在他的博客 The Log: What every software engineer should know about real-time data’s unifying abstraction 中详细叙述了Kafka的由来、设计思想和技术细节;通过他的叙述我们可以知道,当时在LinkedLin的软件系统中,一个最大的痛点是存在很多不同类型的数据源和数据处理子系统,而这些子系统和数据源之间存在极为混乱的数据流交互,因此他们设计了一个基于日志的高性能中间件来进行解耦,这就是Kafka原型最初的设计背景。Kafka这个名字也是Jay Kreps取的,来自于小说家弗兰茨·卡夫卡的名字,就是那个著名的《变形记》的作者
一开始,Kafka是以Apache基金会的名义进行开源的,所以称之为Apache Kafka; 因为这个项目非常火热,并且具有非常大的扩展空间,一家名为Confluent的公司以Apache Kafka为基础组件,扩展了许多其他的功能组件,并称之为Confluent Kafka,可以看成是Apache Kafka的扩展版本,当然这里面某些组件是收费的。为啥这家公司能有能力推出这个加强版,是因为Apache Kafka的大部分的核心贡献者都在这个公司!不太确定是不是这些贡献者创建的这个公司
Apache Kafka 和 Confluent Kafka
如果你去Google搜索Kafka,首先搜出的结果都是Confluent Kafka,而非Apache Kafka,这可能会给初学者带来一些困惑。正如上面所说的Apache Kafka和Confluent Kafka的历史,Confluent Kafka是Confluent公司基于Apache Kafka推出的组件库,Apache Kafka包括的基础组件,也是Confluent Kafka的基本组件,其代码其实就是一样的。下面这张从官网保存的示意图展示了两者的关系,也展示了Confluent Kafka所包含的组件类型,和外部系统的整合关系
Confluent Kafka是Confluent公司推出的,所以包含了非开源组件或服务,不过不用太担心,Apache Kafka的组件,仍然是随意使用的,只有部分组件才是遵循商业许可证的。事实上,Confluent Kafka制定了两种License:Community和Commercially。Community就是社区版,基本等同于Apache 2.0;Commercially则是商业版。可以在Confluent Kafka的官方问答中查看具体信息(Confluent Community License FAQ)
对于想要查看文档的初学者来说,推荐还是直接看Confluent Kafka的官方文档,虽然说Apache Kafka和Confluent Kafka是并存的状态,它们各自的文档都被持续维护,但是Confluent Kafka的官方文档更加详细,新手教程更多。
Apache Kafka的官方文档地址: Apache Kafka Documentation
Confluent Kafka的官方文档地址: Confluent Kafka Documentation
Kafka和Zookeeper
在安装和配置Kafka环境时,你会发现Zookeeper模块是必需的核心模块。Kafka最开始就是设计为分布式系统的,不过这个项目本身要处理的核心问题是基于日志模型的数据流,至于集群内节点的管理则直接使用了Zookeeper这个项目。这一设计其实存在某些问题,尤其当Kafka这个项目变得越来越大,甚至扩展出商用组件的时候,因为这两个项目都是面向分布式的,各自的系统设计又有很大区别,对开发和维护者来说,两个系统存在功能冗余;对使用者来说,则需要额外学习Zookeeper的知识体系。Kafka的贡献者们也早就意识到这个问题,从Apache Kafka 2.8版本开始,他们开始尝试去除掉对Zookeeper的依赖,改为开发Kafka自身的集群元数据管理模块,不过目前这一设计还不完善。你可以在这篇博客中查看细节: Removing the Apache ZooKeeper Dependency
总之,目前要搭建一个Kafka环境,还是绕不开Zookeeper的
搭建一个Kafka环境
现在让我们先搭建一个单节点的Kafka环境。
一种简单的方案是按照Apache Kafka的Quick Start文档,直接下载Apache Kafka的文件包,本质上Apache Kafka的各个模块其实就是运行这个文件包内的不同执行程序。不过这种安装方式太过朴素,我们后续还得自己动手完善环境,比如得自己创建服务执行脚本,把文件包内的Zookeeper和Kafka打包成自启动服务。
现在我们一般都会采取流行的Docker容器方案进行安装,当然前提是你需要有Docker环境和基本的相关知识。Apache Kafka本身是没有提供Docker镜像的,很多第三方的开源项目提供了不同的镜像,Confluent Kafka同样如此,而且Confluent的目的是提供以Apache Kafka为基础的平台,所以除了Apache Kakfa的镜像,也提供了多个其他组件的镜像。
下面的docker-compose.yml文件是从官方参考中拷贝的,并且我把非社区版的组件删除了,只保留了几个最基础的Kafka镜像
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
hostname: zookeeper
container_name: zookeeper
restart: always
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.1 #cp-kafka is Community licensed, cp-server is Commercially licensed
hostname: broker
container_name: broker
depends_on:
- zookeeper
restart: always
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
#KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
restart: always
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.0.1 #cp-ksqldb-server contains ksqldb-server
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
restart: always
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.0.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
restart: always
entrypoint: /bin/sh
tty: true
rest-proxy:
image: confluentinc/cp-kafka-rest:7.0.1
restart: always
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
简要说明一下这个docker-compose.yml里包含的镜像
- confluentinc/cp-zookeeper,Kafka必需的基础服务
- confluentinc/cp-kafka,Apache Kafka在Confluent平台下的打包镜像
- confluentinc/cp-schema-registry,Kafka Schema Registry服务,Schema Registry用于Kafka通信中结构化数据的序列化/反序列化,也是属于Kafka最基础最核心的功能之一
- confluentinc/cp-ksqldb-server,ksqlDB服务器的镜像,ksqlDB是一个提供类SQL语言来构建Kakfa流应用的框架,这个服务不是必需的
- confluentinc/cp-ksqldb-cli,用于连接ksqlDB并执行命令的CLI工具,这个服务不是必需的
- confluentinc/cp-kafka-rest,Confluent REST Proxy服务的镜像,提供一系列和Kafka集群交互的API接口,这个服务不是必需的
一个Kafka集群环境,只有Zookeeper、Kafka Broker和Schema Registry是必需的服务,所以如果不希望太多的扩展内容造成学习的干扰,可以就只保留这个三个的镜像配置。在进行Docker容器部署前,还需要注意一点,在docker-compose.yml的broker服务配置中,KAFKA_ADVERTISED_LISTENERS配置项的值:PLAINTEXT_HOST://localhost:9092,最好把这个localhost改成你自己的服务器的实际IP,否则可能无法远程访问你服务器上的Kafka集群
执行如下的命令来拉取镜像并启动各个服务的容器
sudo docker-compose up -d
通过命令
sudo docker ps -a
来查看各个Kafka服务的启动情况
要检查Kafka集群是否能被外部系统访问,可以使用一些第三方的Kafka客户端产品。我这里是使用了一个名叫Conduktor的产品,完整的功能是收费的,不过只用它来检查连通性就没必要购买许可证了。
Produce和Consume测试
在Kafka的术语中,消息的发送者称之为Producer,消息的接收者称为Consumer,Kafka集群里的节点称为Broker,Kakfa集群把消息放置在不同的Topic里面进行管理。下面我们采用两种方式进行简单的消息生产和消费的测试
通过REST Proxy API
这种测试方式前提是需要按照上文介绍的Docker安装,安装了Confluentinc REST Proxy服务。
打开一个控制台,先执行下面这个curl命令来生产一条消息
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/jsontest"
这个API的调用参数里,发送了一条{“foo”:“bar”} 的简单数据,而URL里的jsontest则指定了topic的名字;如果执行成功,应该会打印出如下的信息,在Kafka集群中,创建一个叫jsontest的topic,里面存放了一条值为{“foo”:“bar”}的消息
接下来几个API操作都是consume操作。先用这个指令创建一个消费者实例
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_json_consumer
再用下面这个指令让这个消费者实例注册到刚才的topic下面
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
最后用这个指令让消费者从jsontest中拉取数据
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
如果执行成功,可以在控制台中看到拉取到的数据
通过Python脚本
上面通过REST Proxy API的方式,创建的生产者和消费者其实都只是存在于Kafka集群里的缓存实例,正常应用里,需要远程访问Kafka。下面我们写两个python脚本,一个作为Producer应用,一个作为Consumer应用,通过Confluent Kafka Producer\Consumer库,远程连接Kafka集群进行测试。当然,其他语言也有对应的Producer\Consumer库。
首先是Producer程序。先安装好confluent_kafka模块,再写下如下的代码:
rom confluent_kafka import Producer
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
topic='jsontest'
producer = Producer({'bootstrap.servers': '192.168.56.101:9092'})
data='''
{"foo":"bar"}
'''
producer.poll(0)
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
producer.produce('topic', data.encode('utf-8'), callback=delivery_report)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
producer.flush()
这个代码很简单,在初始化Producer对象的时候指定Kafka集群,再通过produce方法向指定的topic推送数据
然后是Consumer的代码
from confluent_kafka import Consumer
topic='jsontest'
consumer = Consumer({
'bootstrap.servers': '192.168.56.101:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
consumer.subscribe([topic])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
consumer.close()
在初始化的时候同样指定Kafka集群,再使用subscribe方法来注册到指定的topic下,最后用poll方法来拉取数据
测试的时候,可以先运行Consumer程序,其控制台输出保持空,直到启动Producer程序,会打印出数据