文章目录
文章学习于 OrcHome 教程
1. 初识 kafka
举个栗子,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋;
- 假设消费者在消费鸡蛋时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了;
- 再假设生产者很强劲,生产者每次生产两个鸡蛋,消费者只能消耗一个鸡蛋,要不了一会,消费者就吃不消了(消息堵塞,导致系统超时),鸡蛋又一次丢失了;
- 这个时候,找个篮子放在中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是传说中的kafka
;
- 鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的,也叫做消息。
- 消息队列满了,其实就是篮子满了,“鸡蛋”放不下了,那多找几个篮子,其实就是 kafka 的扩容;
2. kafka 名词解释
-
producer
:生产者,就是它来生产“鸡蛋”的; -
consumer
:消费者,消费生产的“鸡蛋”; -
topic
:可以将之理解为标签,生产者每产出一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。 -
broker
:就是篮子了。
3. 入门介绍
3.1 主要功能
官网给出了答案:
- It lets you publish and subscribe to streams of records.
- It lets you store streams of records in a fault-tolerant way.
- It lets you process streams of records as they occur.
换句话说: - 发布和订阅消息流,类似于一个消息队列或企业消息系统;
- 以容错(故障转义)的方式存储消息(流);
- 在消息流发生时处理它们。
3.2 kafka 使用场景
官网又给出了答案:
- Building real-time streaming data pipelines that reliably get data between systems or applications.
- Building real-time streaming applications that transform or react to the streams of data.
再换句话说: - 构建实时的数据流管道,可靠的获取系统和应用程序之间的数据;
- 构建实时流的应用程序,对数据流进行转换或反应。
3.3 一些常识
- kafka 作为一个集群运行在一个或多个服务器上;
- kafka 集群存储的消息是以 topic 为类别记录的;
- 每个消息(也叫 record)是由一个 key,一个 value 和时间戳构成。
3.4 kafka 的 4 个核心 API:
- 应用程序使用
Producer API
发布消息到 kafka 集群中的一个或多个 topic 中; - 应用程序使用
Consumer API
来订阅一个或多个 topic,并处理产生的消息; - 应用程序使用
Streams API
充当一个流处理器,从一个或多个 topic 消费输入流,并生产一个输出流到一个或多个输出 topic,有效的将输入流转换到输出流。 -
Connector API
:可构建或运行可重用的生产者或消费者,将 topic 连接到现有的应用程序或数据系统。例如,连接到关系数据库的连接器可以捕获表的每个变更。
3.5 kafka 基本术语
就是前边的四个名词,这一次是准也得术语;
-
topic
:将消息分门别类,每一类的消息称为一个主题(Topic); -
producer
:发布消息的对象称之为 topic 生产者(kafka topic producer); -
consumer
:订阅消息的对象称之为 topic 消费者(kafka topic consumer); -
broker
:已发布的消息保存在一组服务器中,称之为 kafka 集群,集群中的每一个服务器都是一个代理(broker)。消费者可以订阅一个或多个 topic,并从 broker 中拉取数据,从而消费这些已发布的消息。
3.6 Toic 和 Log
- 之前已经对 topic 这个名词进行了解释,它是一堆消息的集合;
- 一个 topic 可以有另个,一个或多个消费者订阅该 topic 的消息;
- 对于每个 topic,kafka 集群都会去维护一个分区 Log,如上图所示。
- 每创建一个 topic,可以指定多个分区(partition),分区数目越多,吞吐量就越大,但是需要的资源也就越多,会造成更高的不可用性;
- kafka 在接收到生产者发送的消息之后,会根据均衡策略 将消息存储到不同的分区(我们假定这个 topic 有三个分区);
- 每个分区中,消息以顺序存储,最晚接收的消息会最后被消费。
- 分区中的消息存储时,都会被分一个序列号,称之为偏移量(offset),在每个分区中,偏移量都是唯一的。
- kafka 集群会保持所有的消息,无论你消费不消费,直到他们过期;
- 这偏移量由消费者来掌握,消费者可以将偏移量重置为更早的位置,重新读取消息;
- 一个消费者的操作不会影响其他消费者对此 Log 的处理。
3.7 分布式(Distribution)
- Log 的分区被分布到集群中的多个服务器上,每个服务器处理它分到的分区;
- 根据配置每个分区还可以复制到其他服务器作为备份容错;
- 每个分区有一个 leader,零个或多个 follower;
- leader 处理此分区的所有读写请求;
- follower 被动的复制数据;
- 如果 leader 宕机,其他的一个 follower 会被推举为新的 leader;
- 一台服务器可能同时是一个分区的 leader,另一个分区的 follower;
- 这样可以避免所有的请求都只让一台或者某几台服务器处理。
3.8 Geo-Replication(异地数据同步技术)
- kafka MirrorMaker 为群集提供
geo-replication
支持; - 借助
MirrorMaker
,消息可以跨多个数据中心或云区域进行复制; - 可以在 active/passive 场景中用于备份和恢复;
- 或在 active/passive 方案中将数据置于更接近用户的位置,或数据本地化。
3.9 生产者(producers)
- 生产者往某个 Topic 上发布消息;
- 生产者也负责选择发布到 topic 上的哪一个分区;
- 最简单的方式是从分区列表中轮流选择;
- 也可以依照某种算法依照权重选择分区;
- 开发者负责如何选择分区的算法。
3.10 消费者(Consumers)
-
通常来讲,消息模型可以分为两种:
- 队列模型;
- 发布-订阅模型;
-
队列模型的处理方式是一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理;
-
发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息;
-
Kafka 为这两种模型提供了单一的消费者抽象模型:消费者组(consumer group);
- 消费者用一个消费者组名标记自己,一个发布在 Topic 上消息被分发给此消费者组中的一个消费者;
- 假如所有的消费者都在一个组中,那么这就变成了队列模型;
- 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型;
- 一般可以创建一些消费者组作为逻辑上的订阅者,每个组包含数目不等的消费者,一个组内多个消费者可以用来扩展性能和容错。
-
传统的队列模型保持消息,并且保证消息的先后顺序不变;
-
并行消费不能保证消息的先后顺序;
-
kafka 采用分区策略,是因为 topic 分区中消息只能由消费者组中的唯一一个消费者来处理,所以消息肯定是按照先后顺序进行处理的;但是它也仅仅只能保证一个分区的顺序处理,不能保证跨分区的消息先后处理顺序;
3.11 Kafka 的保证(Guarantees)
- 生产者发送到一个特定的 topic 分区上,消息将会按照它们发送的顺序依次假如;
- 也就是说,如果一个消息 M1 和消息 M2 使用相同的 Producer 发送,M1 先发送,name M1 将比 M2 的 offset 低,并且优先的出现在日志中;
- 消费者收到的消息也是此顺序;
- 如果一个 topic 配置了复制银因子(replication factor)为 N,那么可以允许 N-1 服务器宕机而不丢失任何已经提交(committed)的消息。
3.12 Kafka 的流处理
- Kafka 的目标是实时的流处理。
- 在 Kafka 中,流处理持续获取
输入 topic
的数据,进行处理加工,然后写入输出 topic
; - 可以直接使用 producer API 和 consumer API 进行简单的处理;
- 对于复杂的转换,可以使用 Streams API 处理。
- Streams API 在 Kafka 中的核心:使用 producer API 和 consumer API 作为输入,利用 Kafka 做状态存储,使用相同的组机制在 stream 处理器实例之间进行容错保障。
4. Kafka 的使用场景
- 消息处理:
- Kafka 更好的替换传统的消息系统,具有更好的吞吐量,内置分区,副本和故障转移,有利于处理大规模的消息;
- 网站活动追踪:
- 对于用户和网站的活动发布到不同的 topic 中心,这些消息可以实时处理,实时检测,也可加载奥 Hadoop 或离线处理数据仓库。
- 指标:
- 用于监测数据,分布式应用程序生成的统计数据集合聚合。
- 日志聚合:
- 日志聚合通常从服务器中收集物理日志文件,并将它们放在*位置(可能是文件服务器或HDFS)进行处理。
- Kafka 抽象出文件的细节,并将日志或事件数据更清晰地抽象为消息流。
- 流处理;
- 事件采集;
- 提交日志;
5. Kafka 安装和启动
5.1 安装
tar -xzf kafka_2.13-2.8.0.tgz
之后的几步了解下就行,最终会以系统服务方式启动,避免这种繁琐的方式。
5.2 启动
- 进入 kafka 的目录:
cd kafka_2.13-2.8.0
- 终端1:启动 kafka 自带打包和配置好的 Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
- 终端2:启动 kafka 服务:
bin/kafka-server-start.sh config/server.properties
- 以上两个终端不能关,关了就是用不了了。
5.3 创建一个 topic
- 终端3:创建一个名为“kafkaTest”的Topic,只有一个分区和一个备份:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafkaTest
- 查看已创建的 topic 信息:
bin/kafka-topics.sh --describe --topic kafkaTest --bootstrap-server localhost:9092
- 输出:
Topic: kafkaTest TopicId: o0NwNuMKTuORqpEAZ7hBsg PartitionCount: 1 ReplicationFaes=1073741824
Topic: kafkaTest Partition: 0 Leader: 0 Replicas: 0 Isr: 0
5.4 生产消息
- Kafka 提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给 Kafka 集群。每一行是一条消息。
- 运行 producer(生产者),然后在控制台输入几条消息到服务器。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkaTest
5.5 消费消息
- Kafka 也提供了一个消费消息的命令行工具,将存储的信息输出出来;
- 终端4:新打开一个命令控制台,输入:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkaTest --from-beginning
6. 以系统服务方式启动 kafka
6.1 创建服务文件
- kafka安装地址在
/usr/local/kafka
; - 创建
/usr/lib/systemd/system/zookeeper.service
并写入:
[Unit]
Requires=network.target
After=network.target
[Service]
Type=simple
LimitNOFILE=1048576
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=Always
[Install]
WantedBy=multi-user.target
- 创建
/usr/lib/systemd/system/kafka.service
并写入:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
LimitNOFILE=1048576
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=Always
[Install]
WantedBy=multi-user.target
6.2 启动服务
systemctl daemon-reload
systemctl enable zookeeper && systemctl enable kafka
systemctl start zookeeper && systemctl start kafka
systemctl status zookeeper && systemctl status kafka