一、为何使用消息队列
- 流量削峰
- 系统解耦
- 异步处理
二、为何使用Kafka
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
生产者-消费者 | 支持 | 支持 | 支持 | 支持 |
发布-订阅 | 支持 | 支持 | 支持 | 支持 |
应答模式 | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持,JAVA优先 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级 | 万级 | 十万级 | 单机万级 |
消息延迟 | - | 微秒级 | 毫秒级 | - |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | - | 低 | 理论上不会丢失 | - |
消息重复 | - | 可控制 | 理论上会有重复 | - |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | - | 低 | 中 | 高 |
1.如何支持生产者-消费者模式以及发布订阅模式?
生产者将消息发送到分区,由不同的消费者对topic来进行订阅。
如果消费者的组id相同,则当前分区的消息只能被同一个消费者组的一个消费者消费。
如果消费者的组id不同,则当前分区的消息可以被每个消费者组的一个消费者消费。
2.为什么快?
a.顺序写入
b.可利用partition并行处理
c.批处理以及压缩
c.利用page cache
Broker 收到数据后,写磁盘时只是将数据写入 Page Cache,并不保证数据一定完全写入磁盘。consumer读取数据也是先在Page Cache中读取数据,不存在才会去磁盘进行查找。并且Page Cache有一定的预读功能
b.基于sendfile实现零拷贝
KAFKA推送消息用到了sendfile,落盘技术用到了mmap
传统数据读取
传统数据读取由于用户态无法直接访问内核态,因此调用read读取数据的时候会涉及到用户态到内核态的切换以及内核态到用户态的切换,并且读取数据需要DMA拷贝从磁盘拷贝到内核缓冲区,CPU从内核缓冲区拷贝用户缓冲区,然后在写入到网卡再次经历类似的步骤,因此,共涉及了4次用户态与内核态的切换,2次DMA拷贝以及1次CPU拷贝。
sendFile实现零拷贝
sendFile可以直接在内核态将缓冲区数据从源端拷贝到目的端,因此原来read+write由一个sendFile调用即完成了,减少了2次内核态、用户态的切换以及一次CPU拷贝过程。并且在部分硬件支持的情况下,步骤2和3也可以省略,可以直接由DMA将内核缓冲区数据拷贝到网卡,真正实现零拷贝。
mmap
对于kafka来说,Producer生产的数据存到broker,这个过程读取到socket buffer的网络数据,其实可以直接在OS内核缓冲区,完成落盘。并没有必要将socket buffer的网络数据,读取到应用进程缓冲区;在这里应用进程缓冲区其实就是broker,broker收到生产者的数据,就是为了持久化。
mmap 就是在用户态直接引用文件句柄,也就是用户态和内核态共享内核态的数据缓冲区,此时数据不需要复制到用户态空间。当应用程序往 mmap 输出数据时,此时就直接输出到了内核态数据,如果此时输出设备是磁盘的话,会直接写盘(flush间隔是30秒)。
3.为什么能保证消息有序、不丢失数据以及重复消费?
kafka的副本集由AR,ISR,OSR的概念,AR即当前topic当前分区的所有副本集合,ISR为与leader副本数据保持一定程度同步的副本,OSR为与leader副本数据同步程度较低的副本。因此
1、当leader副本下线的时候,选举leader可配置为从ISR选举leader,避免数据丢失
2、配置acks的参数值为all,表示需要所有副本都成功写入消息才表示消息推送成功
3、而且producer发送的时候可以异步发送在回调方法中处理异常
因此能保证消息不丢失
kafka通过偏移量的机制来保证consumer拉取消息,由于kafka写入是顺序写入,消费消息也根据偏移量拉取消息,保证了消息有序。通过自动提交、手动提交偏移量等机制来保证不重复消费,但是在某些情况下依然会存在重复消费,比如自动提交,在下一次自动提交偏移量之前消费者挂掉了,重新上线就会出现重复消费。
三、相关配置参数
acks:默认值为1,acks=1代表需要分区的leader副本成功写入消息即会受到服务器的成功相应;acks=0代表消息发送之后不需要服务器的任何相应。acks=-1/all代表需要等待ISR中的所有副本都成功写入消息之后才能收到服务器的相应。
max.request.size:限制生产者客户端能发送的消息的最大值。默认值为1MB。需要和broker端的message.max.bytes联动配置。避免max.request.size超过此值得情况。
retries、retry.backoff.ms:retries用来配置生产者重试的次数,默认为0,只有重试次数达到配置的值并且依然发送失败才会抛出异常到应用。retry.backoff.ms默认值为100ms,代表重试之间间隔100ms。
compression.type:指定消息的压缩方式,默认值为‘none’。参数可配置为:gzip,snappy,lz4.
connections.max.idle.ms:指定多久之后关闭空闲的连接,默认值540000
linger.ms:用来指定生产者发送ProducerBatch之前等待更多的消息加入producerBatch的时间,默认值为0。
batch.size:指定producerBatch可以服用内存区域的大小默认16kb
buffer.memory:默认32M,生产者客户端中用户缓存消息的缓冲区大小
request.timeout.ms:用来指定producer等待请求相应的最长时间,默认为30000ms
metadata.fetch.timeout.ms:
auto.offset.reset:当没有初始偏移量或当前偏移量不存在时设置消费的起始点(earliest、latest、none三个值可选择),默认值earliest
enable.auto.commit:是否后台自动提交偏移量
auto.commit.interval.ms:后台自动提交消费偏移量(后台定时提交)的间隔时间,默认5000
group.id:配置消费者组
session.timeout.ms:30000 group coordinator 判断某个consumer下线的时间
heartbeat.interval.ms:10000 消费者向group coordinator 发送心跳包的时间
max.partition.fetch.bytes:102400 该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB
max.poll.records:10 批量拉取消息条数
四、相关扩展接口
1、自定义分区器Partitioner
2、生产者拦截器ProducerInterceptor<String,String>、
3、消费者拦截器ConsumerInterceptor<String,String>
4、再均衡器ConsumerRebalanceListener
五、平台中的实践
1、生产者
a、平台中提交统一采用同步提交,除此之外也可以进行异步提交,通过回调来实现消息发送之后的对于成功或失败的处理
2、消费者
a、消费
1)、使用subscribe而不是assign进行订阅将分区的负载消费交给kafka而不是写死指定,能够进行故障转移,防止因为某个服务挂掉或者等原因造成该分区消息无法消费,但是也随之带来了新的问题,没当有消费者下线或者新消费者加入的时候会触发kafka的重平衡,因此在平台中还实现了ConsumerRebalanceListener接口可以在重平衡前提交当前的偏移量,避免重复消费的问题(如上所述,kafka消费者自动提交偏移量是每5秒定时进行偏移量的提交)
2)、以topic为键,消费者及其其他信息为值维护了一个map,并通过暴露接口来修改topic对应的值得状态,然后在每次消费的时候去根据topic去找到所有的消费者并过滤当前消费者,即可以使用consumer.pause、consumer.resume进行当前消费者对topic的暂停与恢复
3)、消费者创建流程
actualConsumer为实际的消费者,消息平台为批量消费。那么就需要如下一些参数,批量消费参数(消费者线程数,消费者处理任务的线程池)、kafka消费者配置(topic等其他一些通用参数)。那么这些参数都可以抽象到kafkaStarter里面进行初始化并传达到实际的消费者ActualConsumer。由于批量消费涉及到1、消费者实际处理任务的线程池,2、消费者线程数。因此我们又需要抽象一层KafkaMessageConsumer来处理具体的创建消费者的任务。