1.kafka 系统架构
1.1 broker
- kafka 集群包含一个或多个服务器,服务器节点称为 broker。
1.2 Tpoic
- 每条发布到 kafka 集群的消息都有一个类别,这个类别称为 Topic。
- 类似于数据的表名。
- 物理上不同的 Topic 的消息分开存储。
- 逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存储于何处。
1.3 Partition
- Topic 中的数据分割为一个或多个 partition。
- 每个 Topic 至少有一个 partition,当生产者生产数据的时候,根据分配策略,选择分区,然后将消息追加到指定的分区的末尾(队列)。
partition 数据路由规则:
- 指定了 partition ,则直接使用。
- 未指定 partition 但指定 key,通过对 key 的 value 进行 hash 选出一个 partition。
- partition 和 key 都未指定,使用轮询选出一个 partition。
- 每个消息都会有一个自增的编号:标识顺序;用于标识消息的偏移量。
- 每个 partition 中的数据使用多个 segment 文件存储。
- partition 中的数据是有序的,不同 partition 间的数据丢失了数据的顺序。
- 如果 topic 有多个 partition,消费时就不能保证数据的顺序。严格保证消息的消费顺序的场景下,需要将 partition 数目设为 1。
1.4 Leader
- 每个 partition 有多个副本,其中有且仅有一个作为 leader,leader 是当前负责数据的读写 partition。
- producer 先从 zookeeper 的 “/brokers/.../state”节点找到该 partition 的 leader。
- producer 将消息发送给该 leader。
- leader 将消息写入本地 log。
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ack。
- leader 收到所有 ISR 中的 relica 的 ack 后,增加 HW 并向 producer 发送 ack。
1.5 Follower
- follower 跟随 leader,所有写请求都通过 leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。
- 如果 Leader 失效,则从 Follower 中选举出一个新的 leader。
- 当 Follower 挂掉、卡主或者同步太慢,leader 会把这个 follower 从 “in sync replicas” (ISR)列表中删除,重新创建一个 Follower。
1.6 replication
- 数据会存放到 topic 的 partition 中,但是有可能分区会损坏。
- 我们需要分兑取的数据进行备份(备份多少取决于你对数据的重视程度)
- 我们将分区 分为 Leader(1) 和 Follower(N)
Leader 负责写入和读取数据;Follower 只负责备份;保证了数据的一致性。
- 备份数为 N,表示 主+备=(N)
Kafka 分配 Replication 的算法如下:
- 将所有 broker(假设供 n 个 broker) 和 待分配的 partition 排序。
- 将低 i 个 partition 分配到第 (i mod n)个 broker 上
- 将第 i 个 partition 的第 j 个 replica 分配到第 ((i+j) mod n)个 broker 上
1.7 producer
- 生产者即数据的发布者,该角色将消息发布到 Kafka 的 Topic 中
- broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。
- 生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition。
1.8 consumer
- 消费者可以从 broker 中读取数据。消费者可以消费多个 topic 数据。
- Kafka 提供了两套 consumer API
- The high-level Consumer API
- The SimpleConsumer API
- high-level Consumer API 提供了一个从 Kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多的关注细节。
1.9 consumer Group
- 每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 Consumer Group,若不指定 group 则属于默认的 group)
- 将多个消费者集中到一起去处理某个 Topic 的数据,可以更快的提高数据的消费能力。
- 整个消费者组共享一组偏移量(防止数据被重复读取),因为 Topic 有多个分区。
1.10 offset 偏移量
- 可以唯一的标识一条消息
- 偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息。
- 消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 Kafka 的消息
- 我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。
- 消息最终还是会被删除的,默认声明周期为 1 周。
1.11 zookeeper
- Kafka 通过 zookeeper 来存储集群的 meta 信息。
2.Kafka 环境搭建
基于 Zookeeper 搭建并开启
./zkServer.sh start
若不知道如何安装Zookeeper可参考 Zookeeper 安装教程链接
配置Kafka
1)长传压缩包,解压到 /usr/soft 目录,文件夹名称为 kafka
2)修改配置文件【server.properties】
vim server.properties
修改如下对应的配置项 ,注意修改
#集群中,每个broker.id都不能相同
broker.id=0
#根据端口占用情况修改
port=9092
#开启远程监听,让第三方连接,改自己本机ip地址
listeners=PLAINTEXT://192.168.65.129:9092
#允许彻底删除
delete.enable.topic=true
# log文件目录
log.dirs=/usr/soft/kafka-logs
# 配置 zookeeper 服务器的 ip:port,使用英文逗号分隔
zookeeper.connect=192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181
3)修改环境变量
vim /etc/profile
在文件末尾添加如下内容
export KAFKA_HOME=/usr/soft/kafka
export PATH=$KAFKA_HOME/bin:$PATH
配置文件生效
source /etc/profile
4)另外两台机器按照 1,2,3的步骤进行操作。
注意:server.properties 中的 broker.id 分别改成 1、2。
5)启动集群。
kafka-server-start.sh /usr/soft/kafka/config/server.properties
注意:需要启动前要关闭防火墙
- systemctl stop firewalld.service
- 永久关闭 systemctl disable firewalld.service
3.常用命令
创建 Topic
kafka-topics.sh -zookeeper 192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181 --create --replication-factor 2 --partitions 2 --topic trans
- --create: 指定创建topic动作
- --topic:指定新建topic的名称
- --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
- --partitions:指定当前创建的kafka分区数量,默认为1个
- --replication-factor:指定每个分区的复制因子个数,默认1个
删除 tpoic
kafka-topics.sh --delete --zookeeper 192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181 --topic trans
查看主题
kafka-topics.sh -zookeeper 192.168.65.128:2181,192.168.65.129:2181,192.168.65.130:2181 --describe --topic trans
创建生产者
kafka-console-producer.sh --broker-list 192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092 --topic trans
创建消费者
kafka-console-consumer.sh --bootstrap-server 192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092 --topic trans --from-beginning
在生产者窗口中输入信息,在消费者窗口中可以看到,如下图,【虚拟机1】是生产者,【虚拟机2】是消费者。
4.Kafka 数据检索机制
- topic 在物理层面以 partition 为分组,一个 topic 可以分成若干个 partition
- partition 还可以细分为 Segment,一个 partition 物理上由多个 Segment 组成
segment 的参数有两个:
- log.segment.bytes:单个 segment 可容纳的最大数据量,默认为 1GB。
- log.segment.ms:Kafka 在 commit 一个未写满的 segment 前,所等待的时间(默认为 7 天)
- LogSegment 文件由两部分组成,分别为 “.index” 和 “.log” 文件文件,分别表示为 Segment 所有文件和数据文件。
partition 全局的第一个 Segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。
数值大小为 64 位,20位数字长度,没有数字用 0 填充
- 第一个 segment
00000000000000000000.index
00000000000000000000.log
- 第二个 segment,文件名以第一个 Segment 的最后一条消息的 offset 组成
00000000000000170410.index
00000000000000170410.log
- 第三个 segment,文件名以上一个 Segment 的最后一条消息的 offset 组成
00000000000000239430.index
00000000000000239430.log
- 消息都具有固定的物理结构,包括:offset(8 bytes)、消息体的大小(4 bytes)、crc32(4 bytes)、magic(1 Byte)、attribute(1 Byte)、key length(4 Bytes)、key(4 Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。
5.数据的安全性
0:At least one 消息不会丢,但可能会重复传输
1:At most once 消息可能会丢,但绝不会重复传输
3:Exactly once 每条消息肯定会被传输一次且仅传输一次
5.1 producer delivery guarantee
producer 可以选择是否为数据的写入接收 ack,有以下几种选项:request.required.acks
- asks=0:Producer 在 ISR 中的 Leader 已经成功收到的数据并得到确认后发送下一条 Message。若 Producer 没有收到确认,可能会重复发消息。
- asks=1:这意味着 Producer 无需等待来自 Broker 的确认而继续发送下一条消息。
- asks=all:Producer 需要等待 ISR 中的所有 Follower 都确认接收到数据后才算一次发送完成,可靠性最高。
5.2 ISR 机制
关键词:
- AR :Assigned Replicas 用来标识副本的全集。
- OSR:out sync Replicas 离开同步队列的副本。
- ISR:in sync Replicas 加入同步队列的副本。
- ISR = Leader + 没有落后太多的副本(follower);AR = OSR + ISR
我们备份数据就是防止数据丢失,当主节点挂掉时,可以启用备份节点
- producer--push-->leader
- leader<--pull--follower
- follower 每隔一定时间去 Leader 拉取数据,来保证数据的同步
ISR(in sync Replicas)
- 当主节点挂掉,并不是去 follower 选择主,而是从 ISR 中选择主
- 判断标准:超过 10 秒钟没有同步数据(replica.lag.time.max.ms=10000);主副节点差 4000 条数据(replica.lag.time.max.messages=4000)
- 脏节点选举:kafka 采用一种降级措施来处理;选举第一个恢复的 node 作为 leader 提供服务,以它的数据为基准,这个措施被称为脏 leader 选举。
5.3 broker 数据存储机制
物理消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
- 基于时间:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
5.4 consumer delivery guarantee
- 如果 consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit。如果只讨论这一读取过程,那 kafka 确保了 Exactly once。
- 读完消息先 commit 再处理消息
如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作就无法读取到刚刚已提交而未处理的消息。这对应于 At most once。
- 读完消息先处理再 commit 数据
如果在处理消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。
- 如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。经典的做法是引入两阶段提交。
- Kafka 默认保证 At least once。
5.5 数据的消费
partition_num = 2,启动一个 consumer 进程订阅这个 topic,对应的, stream_num 设置为 2,也就是说启动连个线程并行处理 message。
如果 auto.commit.enable=true
- 当 consumer 拉取了一些数据但还没有处完全处理掉的时候
- 刚刚到 commit interval 发出了提交 offset 操作,接着 consumer crash 掉了。
- 这是 拉取的数据还没未完成处理但已经被 commit 了,因此没有机会再次被处理,数据丢失
如果 auto.commit.enable=false
- 假设 consumer 的两个拉取各自拿了一条数据,并且由两个线程同时处理
- 这时线程 t1 处理完 partition1 的数据,手动提交 offset。
- 这里需要说明的是,当手动执行 commit 的时候,实际上是对这个 consumer 进程所占有 partition 进行 commit。也就是说即使 t2 没有处理完 partition2的数据,offset 也被 t1 提交了。这时如果 consumer crash 掉,t2 正在处理的这条数据就丢失了。
解决办法1:将多线程转成单线程
手动 commit offset,并且对 partition_num 启同样数目的 consumer 进程,这样就能保证一个 consumer 进程占有一个 partition,commit offset的时候不会影响别的 partition 的 offset。但这个方法比较局限,因为 partition 和 consumer 的数目必须严格对应
解决办法2:批量 commit
手动 commit offset,另外在 consumer 端将将所有 拉取的数据缓存到 queue 里,当把queue 里所有的数据处理完之后,再批量提交 offset,这样就能保证只有处理完才被 commit。
6.Java API
注意,需要先导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
6.1 生产者代码
public class Hello01Producer extends Thread {
private Producer<String,String> producer;
/**
* 构造器函数
*/
public Hello01Producer(String threadName) {
//设置线程名称
super.setName(threadName);
//创建配置文件列表
Properties properties = new Properties();
//kafka地址,多个地址用逗号分隔
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092");
//key 值的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());
//VALUE 值的序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getTypeName());
//写出的应答方式
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//在重试发送失败的request前的等待时间
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
//批量写出
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//创建生产者对象
producer = new KafkaProducer<>(properties);
}
@Override
public void run() {
//初始化一个计数器
int count = 0;
System.out.println("Hello01Producer.run()——开始发送数据");
while (count < 100000) {
String key = String.valueOf(count++);
String value = Thread.currentThread().getName() + "--" + count;
//封装消息对象
ProducerRecord<String,String> msg = new ProducerRecord<>("trans",key,value);
//发送消息到 Kafka
producer.send(msg);
//打印消息
System.out.println("Producer.send--" +key + "--"+value);
//每隔0.1秒发送一次
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Hello01Producer producer = new Hello01Producer("测试发送");
producer.start();
Thread.sleep(Long.MAX_VALUE);
}
}
6.2 消费者代码
public class Hello01Consumer extends Thread {
private Consumer<String,String> consumer;
public Hello01Consumer(String name) {
super.setName(name);
//创建配置文件列表
Properties properties = new Properties();
//kafka地址,多个地址用逗号分隔
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.65.128:9092,192.168.65.129:9092,192.168.65.130:9092");
//创建组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-data");
//key 值的序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getTypeName());
//VALUE 值的序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getTypeName());
//开启自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//自动提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//自动重置的偏移量
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//实例化消费组
consumer = new KafkaConsumer<String, String>(properties);
}
@Override
public void run() {
//创建list ,元素为 topic 和对应分区
List<TopicPartition> list = new ArrayList<>();
list.add(new TopicPartition("trans",0));
list.add(new TopicPartition("trans",1));
//assign和subscribe 都可以消费 topic
consumer.assign(list);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> record : records) {
System.out.println("接收到数据:" + record.value() + " partition:"+record.partition() + " offset:"+record.offset());
}
}
}
public static void main(String[] args) throws InterruptedException {
Hello01Consumer consumer = new Hello01Consumer("测试接收");
consumer.start();
Thread.sleep(Long.MAX_VALUE);
}
}
6.3 重复消费和数据的丢失
有可能一个消费者取出了一条数据,但是还没有处理完,但是消费者被关闭了。
消费者自动提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000")
若提交间隔 > 单条数据执行时间 (重复)
若提交间隔 < 单条数据执行时间 (丢失)
7.Kafka 优化
9.1 Partition 数目
一般来说,每个 partition 能处理的吞吐为 几 MB/s (具体需要根据本地环境测试),增加更多的 partition 意味着:
- 更高的并发与吞吐
- 可以扩展更多的 consumers(同一个 consumer group 中)
- 若是集群中有较多的 brokers,则可更大程度上利用闲置的 brokers。
- 但是会造成 Zookeeper 的更多选举。
- 也会在 Kafka 中打开更多的文件。
调整准则:
- 一般来说,若是集群较小(小于 6 个 brokers),则 partition_num = 2 x broker_num。这里主要考虑的是之后的扩展。若是集群扩展了一般(例如 12 个),则不用担心会有 partition 不足的现象发生。
- 一般来说,若是集群较大(超过 12 个 brokers),则 partition_num = broker_num。因为这里不再需要再考虑集群的扩展情况,与 broker 相同的 partition 数已经足够应付常规场景。若有必要再手动调整。
- 考虑最高峰吞吐需要的并行 consumer 数,调整 partition 的数目。若是应用场景需要 20 个consumer(同一个 consumer group)并行消费,则据此设置为 20 个 partition
- 考虑 producer 所需的吞吐,调整 partition 数目(如果 producer 的吞吐非常高,或者是在接下来两年内都比较高,则增加 partition 的数目)
9.2 Replication factor
此参数决定的是 records 复制的数目,建议至少设置为 2,一般是 3,最高设置为 4。
更高的 replication factor(假设数目为 N),意味着:
- 系统更未定(允许 N-1 个 broker 宕机)
- 更多的副本(如果 acks=all,则会造成较高的延时)
- 系统磁盘的使用率会更高(一般若是 RF 为 3,则相对于 RF 为 2 时,会占据更多 50% 的磁盘空间)
调整准则:
- 以 3 为起始(当然至少需要有 3 个 broker,同时也不建议 一个 Kafka 集群中节点数少于 3 个节点)
- 如果 replication 性能称为了性能瓶颈或是一个 issue,则建议使用性能更好的 broker,而不是降低 RF 的数目
- 永远不要在生产环境中设置 RF 为 1
9.3 批量写入
为了大幅度提高 producer 写入吞吐量,需要定期批量写文件:
- 每当 producer 写入 10000 条消息时,刷数据到磁盘
log.flush.interval.message=10000
- 每隔1秒钟,刷数据到磁盘
log.flush.interval.ms=1000