Kafka简介
高性能的消息发送与高性能的消息消费
快速入门
创建topic
启动服务后,我们需要创建一个主题(topic)用于消息的发送与接收,并且分区(partition)数、副本(replica)数都指定为一
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test -- partitions 1 --replication-factor 1
查看topic状态
bin/kafka-topic.sh --describe --zookeeper localhost:2181 --topic test
发送消息
使用Kafka提供的脚本往topic发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费消息
使用Kafka提供的脚本接收topic下的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
消息引擎系统
企业消息引擎系统是企业发布的一组规范。公司使用这组规范实现不同系统之间传递语义准确的消息。在实际使用场景中,消息引擎系统通常以软件接口为主要形式,实现松耦合的异步数据传递语义。设计一个消息引擎系统需要考虑两个重要因素
- 消息设计
- 传输协议设计
消息设计
消息引擎系统在设计消息时一定要考虑语义的清晰和格式上的通用性。消息通常采用结构化的方式进行设计,Kafka消息是二进制方式来保存,但依然是结构化的消息
传输协议设计
消息引擎系统中更为关键的部分是如何设计消息传输协议。狭义说来消息传输协议指定了消息在不同系统之间的传输方式。Kafka自己设计了一套二进制消息传输协议
消息引擎范型
最常见的两种消息引擎范型是消息队列模型和发布/订阅模型。消息队列模型是基于队列提供消息传输服务的,多用于进程间通信及线程间通信。
该模型定义了消息队列、发送者和接收者,提供一种点对点(p2p)的消息传递方式,即发送者发送每条消息到队列的指定位置,接收者从指定位置获取消息,一旦消息被消费,就会从队列中移除该消息。每条消息由一个发送者生产且被一个消费者处理
发布/订阅模型,包含主题(topic)、发布者、订阅者。一个topic可以理解为逻辑语义相近的容器;发布者将消息生产出来发送到指定的topic;所有订阅了该topic的订阅者都可以接收到该topic下的所有消息
Kafka概要设计
Kafka是为解决互联网公司超大量级数据的实时传输,因此Kafka在一下四个方面做了精心设计:
- 吞吐量/延时
- 消息持久化
- 负载均衡和故障转移
- 伸缩性
吞吐量/延时
吞吐量与延时这两个指标通常是一对矛盾体,即调优某一个指标另一个指标会变差,但并非线性变化关系。例如Kafka处理一条消息需要花费2毫秒,那么吞吐量不会超过500条消息/秒,但如果采用批处理的思想,Kafka在发送消息时先等待8毫秒,此时消息延时变为10毫秒,但可能这8毫秒我们总共积累了1000条数据,那么系统整体吞吐量变成100000条/秒
Kafka高速写入操作。Kafka每次写入操作都是把数据写入到操作系统的页缓存中,然后由操作系统决定时机将页缓存刷到磁盘
- 操作系统页缓存是在内存中分配,所以消息写入的速度非常快
- Kafka不直接与底层文件系统交互,所有繁琐的I/O操作都由操作系统处理
- Kafka写操作采用追加方式,避免磁盘随机写操作
普通SAS磁盘随机读/写的吞吐量很慢,但对于顺序读/写操作却非常快,甚至超过内存随机I/o速度。这也是Kafka为什么采用追加的方式写入消息,即只能往日志文件末尾追加新写入消息,且不允许修改已写入消息的原因,也得益于此Kafka即使在普通磁盘上也有很高的消息发送吞吐量,可以轻松做到每秒写入几万甚至几十万条消息
Kafka高速读操作
- Kafka在读取消息时会首先尝试从OS的页缓存中读取,如果命中便把消息经页缓存直接发送到网络的Socket上,这个过程
是利用Linux平台sendfile系统调用做到的,即零拷贝技术。事实实际使用的是Java的FileChannel.transferTo方法实现。
Kafka大量使用页缓存,读取消息时大部分消息依然保存在页缓存中,不用穿透到物理磁盘上获取消息,从而极大的提升了消息读
取的吞吐量
综上所述Kafka从以下4点达到了高吞吐量、低延时设计目标
- 大量使用操作系统页缓存,内存操作速度快且命中率高
- Kafka不直接操作I/O操作,由操作系统完成
- 采用追加写入方式,摒弃缓慢的磁盘随机读/写操作
- 使用零拷贝技术加强网络间数据传输效率
消息持久化
Kafka会把消息持久化到磁盘,这样会带来以下好处
- 解耦消息发送与消息消费:Kafka本质上说是提供生产者-消费者模式的完整解决方案。通过将消息持久化到池盘,生产
者只需要将消息写入,而不用关心消费者,增加吞吐量 - 实现灵活的消息处理:为下游子系统(接收Kafka消息的系统)提供消息重演功能
负载均衡&故障转移
负载均衡
- Kafka通过智能化的分区领导者选举算法来实现集群中所有的机器上均等的机会分散partition和leader
故障转移
- Kafka采用会话机制的方式实现故障转移。每台Kafka服务器启动后会以会话的方式把自己注册到Zookeeper服务器上,一旦该服务器运转出现问题,与ZooKeeper的会话不能维持从而超时失效,此时Kafka集群将选举出一台服务器来代替这台服务
器继续工作
伸缩性
线性伸缩性是最理想的状态,但由于分布式系统中单点瓶颈的制约这种理想的线性伸缩几乎不可能。分布式系统中每台服务器都会维护很多内部状态,如果由服务器自己保存这些状态信息,则必须处理一致性问题。但如果服务是无状态的,状态由ZooKeeper这类协调服务来保存和管理,将极大降低服务之间状态共享
Kafka正是将状态统一交由ZooKeeper保管,扩展Kafka集群只用启动新的Kafka服务,与ZooKeeper建立会话即可。当然Kafka也会保存少量的内部状态
Kafka基本概念与术语
消息
Kafka的消息格式包含很多字段,其中有很多字段是用于管理消息的元数据字段,对用户来说是完全透明的。Kafka消息格式总共经历过3次变迁,他们被称为V0、V1和V2版本,目前大部分使用的是V1版本的消息格式
Kafka 0.10之前的版本消息格式为V0
- crc32(4B):crc32校验值,校验范围为magic至value之间
- magic(1B):消息格式版本号,此版本的magic值为0
- attributes(1B):消息的属性,低三位表示压缩类型,0表示NONE,1表示GZIP,2表示SNAPPY,3表示LZ4,其余位保留
- key length(4B):表示消息位的长度,如果位-1,表示没有设置key,即key=null
- key:可选,如果没有key则无此字段,消息建,对消息做partition时使用,决定消息被保存在某个topic的那个partition
- value length(4B):实际消息体的长度,如果为-1,则表示消息为空
- value:消息体
Kafka 0.10.0版本到0.11.0版本消息格式为V1:
V1版本比V0版本多一个8B的timestamp字段
- timestamp作用:
1.内部而言,影响日志保存,切分策略;
2.外部而言,影响消息审计,端到端延迟等功能扩展
Kafka 0.11.0版本之后的版本使用V2版消息格式
- 相对V0、V1,V2版本改动较大,引入了变长整型Varints和ZigZag编码
topic和partition
topic是一个逻辑概念,代表一类消息,通常可以使用topic区分实际业务。Kafka中的topic通常都会被多个消费者订阅,出于性能的考虑,Kafka并不是topic-message的两级结构,而是topic-partition-message的三级结构来分散负载,每个topic
包含若干个partition。
topic由多个partition组成,partition是不可修改的有序消息序列,每个partition有自己专属的partition号,通常从0开始,用户对partition唯一能做的就是往消息序列的尾部追加写入消息,partition上的每条消息都会分配一个唯一的序列号即offset,该位移值是从0开始顺序递增的整数,位移信息可以唯一定位到partition下的一条信息
offset
topic partition下每条消息都被分配一个位移值。每个Kafka消费端也有位移的概念。每条消息在某个partition中的
位移是固定的,但消费该partition的消费者的位移会随着消费进度前移,但不可超过该分区最新一条消息的位移
replica
分布式系统必然要实现高可靠性,而目前主要途径还是通过冗余机制即备份实现。这些备份日志在Kafka中被称为副本,副本分为两类:领导者副本和追随者副本,追随者副本是不能提供服务给客户端的,只能被动向领导者副本获取数据,而一旦领导者副本主机宕机,Kafka将从剩余追随者副本中选出新的领导者副本继续提供服务
leader和follower
Kafka的replica分为两个角色:领导者(leader)和追随者(follower)。leader提供对外服务,follower只是被动地追随leader的状态,保持与leader的同步,follower存在的唯一价值就是充当leader的候补:一旦leader挂掉立即从追随者中选出新的leader接替。Kafka保证同一个partition的多个replica一定不会分配到同一台broker上。
ISR
ISR全称in-sync replica即leader replica保持同步的replica集合。Kafka为partition动态维护一个replica集合。该集合中所有的replica保存的消息日志都与leader replica保持同步,只有在这个集合中的replica才能被选举为leader,也只有该集合中所有的replica都接收到了同一条消息,Kafka才会将消息置于"已提交"状态,即认为这条消息发送成功。Kafka承诺只要replica集合至少存在一个replica,那些"已提交"状态都消息就不会丢失即
- ISR中存在至少一个"存活的"replica
- "已提交"消息
正常情况,partition所有replica都因该与leader replica保持同步,即所有replica都在ISR中,但由于各种各样的原因,一小部分replica开始落后于leader replica进度,当滞后到一定程度时,Kafka会将这些replica"踢"出ISR,当这些replica重新"追上"leader的进度时,Kafka会将它们加回ISR中
Kafka使用场景
Kafka以消息引擎闻名,特别适合处理生产环境中那些流式数据
消息传输
Kafka非常适合替代传统消息总线或消息代理,解耦消息生产者和消息消费者,同时Kafka还具有更好的吞吐量、高性能、高可靠和容错性
网站日志行为日志追踪
Kafka最早是由于重建用户行为数据追踪系统。很多网站上的用户操作都会以消息的形式发送到Kafka的某个对应topic上。这些点击流中蕴藏了巨大的商机,很多公司使用机器学习或其它实时处理框架帮助收集并分析用户的点击流数据
审计数据收集
很多企业和组织对关键操作和运维进行监控和审计,这需要从运维应用程序处实时汇总操作步骤信息进行集中式管理。在这中场景下,Kafka能便捷的对多路消息进行实时收集,同时由于其持久的特性,使得后续离线审计成为可能
日志收集
Kafka最常见的使用方式–日志收集汇总解决方案。每个企业都会产生大量的服务日志,这些日志分散在不同的机器上。我们可以使用Kafka对它们进行全量收集,并集中送往下游的分布式存储(HDFS等)。比起主流的日志抽取框架(Apache Flume),Kafka有更好的性能,而且提供了完备的可靠性解决方案
Event Sourcing
Event Sourcing实际上是领域驱动设计(Domain-Driven Design, DDD)的名词,它使用事件序列来表示状态变更。Kafka也是用不可变的消息序列来抽象化表示业务消息的,因此Kafka特别适合作为这种应用的后端存储
流式处理
Kafka Steams,自0.10.0.0版本引入,无缝对接Kafka消息引擎功能