仅仅记录最近学习Kafka笔记
视频地址:https://www.bilibili.com/video/BV1Xy4y1G7zA?p=25
kafka视频笔记
命令:
创建生产者:kafka-console-producer.bat --broker-list localhost:9092 --topic yi
创建消费者(带消费组):kafka-console-consumer.bat --bootstrap-server localhost:9092 --consumer-property group.id=testgroup --topic yi
创建消费者(不带消费组):kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic yi
查询所有消费组:kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
查询消费组详细信息:kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group testgroup
查询主题详细信息:kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic
创建2分区3副本的主题:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
集群创建生产者:kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic my-replicated-topic
集群创建消费者(带消费组):kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic
创建2分区3副本的主题 这个命令可以通过看zk的节点或者看对应的log目录是否多出同名多分区的文件夹。
重点关注:
current-offset:最后被消费的消息的偏移量
log-end-offset:消息总量(最后一条消息的偏移量)
lag:积压了多少条消息
稀疏存储
详细的解释:https://blog.csdn.net/shudaqi2010/article/details/90815675
其实就是根据24小时根据数据条数 默认将id放到index 和 timeindex.log文件内 方便快速查找
分区:
分区的概念就是为了将数据给分开存储 否则几个T的数据都在一个topic下会很大
默认的主题__consumer_offsets(整个kafka集群里只有一份)
kafka内部创建了__consumer_offsets主题包含了50个分区。这个主题用于存放消费者某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费者的主题的偏移量自助上报给kafka的默认的主题:__consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区
提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets主题的分区数
提交到该主题的内容是:key是consumerGroupid+topid+分区号,value是当前offset值
文件中保存的消息,默认保存7天,七天到后消息会被删除
一个分区最多只能被一个消费组里的一个消费者所消费
副本:
可以理解为备份
集群中有多个broker,创建主题时可以指明主题有多个分区(把分区拆分到不同的分区中存储)当ISR 是指当leader挂了的时候会从leader里面集合中读取,可以为分区创建多个副本,不同的副本存放在不同的broker里
创建带有多分区、多副本 查看详细信息里 有如下几个信息
leader:
kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生新的leader
isr:
可以同步和已同步的节点会被存入isr集合中。这里有一个细节:如果isr中节点性能较差,会被剔除ist集合,当leader挂了的时候会从leader里面集合中读取
多播和单播区别
多播(多个地方消费):是指多个消费组里 最新的消费者可以接收到消息 多消费者接收
单播(只有一个地方消费):是指一个消费组里 最新的消费组可以接收到消息 一个消费者接收
在企业中为了保证数据的安全性和一致性 它数据存储在队列上,实际是放到内存上rabbitmq
如果是java 协议支持很多,使用其他的程序,除了java的 它数据存储在队列上,实际是放到内存上 avtivemq
如果你的数据很庞大(大数据) 日志分析,用户行为分析,支持动态扩容,它数据存储在队列上,实际是放到磁盘上,(某个文件)坏处就是不能保证消息的可靠性,kafka
生产者中的ack配置:
在同步发送的前提下,生产者在获得集群返回的ack之前会一直堵塞。那么集群什么时候返回ack呢?此时ack有三个配置
ack = 0: kafka-cluster 不需要任何broker收到消息,就立即返回ack给生产者,最容易丢消息,效率是最高的
ack = 1(默认): 多副本之间的leader已经收到消息,并把消息写入到本地的log中,才会返回ack给生产者,性能和安全性是最均衡的。
ack = 1/all 。 里面有默认的配置min.insync.replicas=2(默认为1,推荐配置大于等于2,如果这个走默认配置,那么和ack = 1 无任何区别),此时就需要leader和一个follower同步完后,才会返回ack给生产者(此时集群中有2个broker已经完成了数据接收),这种方式最安全,但是性能最差
java客户端配置的
生产者配置:缓冲区
kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32m
ProducerConfig.BUGGER_MEMORY_CONFIG,33554432
kafka本地线程会去缓冲区中拉一次16k的数据,发送到broker
ProducerConfig.BATCH_SIZE_CONFIG,16384
如果线程拉不到16k的数据,间隔10毫秒,也会将已拉到的数据发到broker
ProducerConfig.LINGER_MS_CONFIG,10
消费者配置:自动/手动 提交
关于消费者自动提交和手动提交offset
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息的提交到_consumer_offsets主题里面
自动提交
消费者poll消息下来以后就会自动提交offset(这玩意吧。按照我理解 他同步的topic内,这个客户端把消息都拿来了 那么offset就将下标回馈到broker的是下拿到的数据lengh+1的offset)
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"
注意:自动提交会丢消息,因为消费者在消费前提交的offset,有可能提交完后,消息者挂了
手动提交
需要把自动提交的配置改成false
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"
手动提交分2种(一般用的是同步,懒得解释同步异步啥区别)
手动同步提交
手动异步提交
消费者配置:长轮询poll消息
默认的情况,消费者会一次性poll500条消息
CosumerConfig.MAX_POLL_RECORED_CONFIG,"500"
代码中设置了长轮询的时间是1000毫秒
consumer.poll(Duration.ofMillis(1000));
意味着
如果一次poll到500条数据,就直接执行for循环
如果这一次没有poll到500条数据。且时间在一秒内,那么长轮训继续poll,要么到500条,要么到1秒
如果多次poll都没达到500条,且一秒时间到了,那么直接执行for循环
如果两次poll的间隔超过30秒,集群会认为该消费者的能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息少一点
CosumerConfig.MAX_POLL_RECORED_CONFIG,"500"
CosumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000
消费者配置:指定分区和偏移量、时间消费
指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)))
从头消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)))
consumer.seek(new TopicPartition(TOPIC_NAME,0),10)
指定时间消费
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费
消费者配置:新消费者的消费offset的规则
新消费组的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息。)可以通过以下设置,让新的消费者第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
Latest:默认,消费新消息
earliest:第一次从头开始消费。之后开始消费新的消息
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"
消息的细节
生产者将消息发送给broker,broker会将消息保存在本地的日志文件中?
C:\Kafka\kafka_2.12-2.8.1\kafka-logs\主题-分区\0000000000000.log
消息的保存是有序的,通过offset偏移量来描述消息的有效性
消费者消费消息时也是通过offset来描述当前消费的那条消息的位置
kafka为了保证消费的顺序,所以一个分区只能被同一个消费组里的一个消费组所消费
如果消息发给多个分区,那么顺序是不可以保证的
kafka: 3.0开始不需要zk也可以了
stream 是干嘛的? 消息二次加工!
磁盘带索引的读取 要比内存随机读取要快??
zookeeper 主要功能就是做分布式协调的