第一部分:kafka概述
一、定义(消息引擎系统)
一句话概括kafka的核心功能就是:高性能的消息发送与高性能的消息消费。
kafka刚推出的时候是以消息引擎的身份出现的,它具有强大的消息传输效率和完备的分布式解决方案,随着版本更新,在kafka0.10.0.0版推出了流式处理组件——Kafka Streams,使kafka交由下游数据处理平台做的事也可以自己做,自此kafka在消息引擎的基础上正式成为了一个流式处理框架。但无论是消息引擎还是流式处理平台,kafka的处理架构从未质变,概括如下:
图 kafka简要架构图
总结就是三句话:
- 生产者发送消息给kafka服务器;
- 消费者从kafka服务器读取消息;
- kafka服务器依托zookeeper集群进行服务的协调管理。
说到消息引擎,和它类似的术语就是消息队列和消息中间件,个人感觉称kafka为消息引擎更合理。因为“消息队列”名字给出了一个很不准确的暗示,仿佛它就是以队列的形式实现的;而消息中间件有点过度强调了“中间件”之嫌,使其真实用途不够明显。
消息引擎系统既然是在不同应用之间传输消息的系统,那么在设计时需要重点考虑的关键因素就是:消息设计、传输协议设计和消息引擎范型。
消息设计
消息引擎系统在设计消息时一定要考虑语义的清晰和格式上的通用性,消息通常都采用结构化的方式进行设计,比如XML格式、JSON格式的消息等,而kafka的消息是用二进制方式来保存的,但依然是结构化的消息。
传输协议设计
广义上的传输协议包括任何能够在不同系统间传输消息或是执行语义操作的协议或框架,比如RPC及序列化框架、Google的ProtoBuffers、阿里系的Dubbo等,而kafka自己设计了一套二进制的消息传输协议。(后面再讲)传输协议作为一个基础构建块,它服务于消息引擎系统实现的消息引擎范型。
消息引擎范型
最常见的两种消息引擎范型是消息队列模型和发布/订阅模型。
消息队列模型是基于队列提供消息传输服务的,其定义了消息队列、发送者、接收者,提供的是一种点对点的消息传递方式。一旦消息被消费就会从队列中移除该消息,每条消息由一个发送者生产出来,且只被一个消费者处理——发送者和消费者是一对一的关系,类似生活中之前的电话接线生的工作。
发布/订阅模型有主题的概念,一个主题可以理解为逻辑语义相近的消息的容器,该模型也定义了类似生产者、消费者的角色,即发布者(publisher)和订阅者(subscriber)。发布者将消息生产出来发送到指定的topic中,所有订阅了该topic的订阅者都可以接受到该topic下的所有消息,类似生活中报纸的订阅。
kafka通过引入消息组(consumer group)来同时支持这两种模型。(后面再讲)
二、概要设计
kafka的设计初衷就是为了解决互联网公司超大量级数据的实时传输,概要设计关键点:吞吐量/延时、消息持久化、负载均衡和故障转移、伸缩性。
1、吞吐量/延时
kafka的吞吐量就是指每秒能够处理的消息数或者每秒能处理的字节数。kafka的延时可以表示客户端发起请求与服务器处理请求并发送响应给客户端之间的这段时间。在实际使用场景中,这两个指标通常是一个矛盾体,但也不是等比例的此消彼长的关系。
kafka写入端实现高吞吐量低延时的方法原理:利用操作系统的页缓存和采用追加写入消息的方式。
kafka会持久化所有数据到磁盘,但是本质上每次写入操作都只是把数据写入到操作系统的页缓存中,然后由操作系统自行决定何时把页缓存中的数据写回磁盘上。正是得益于这种对磁盘的使用方式,使得kafka的写入操作是很快的。
这样设计有3个主要优势:
- 操作系统页缓存是在内存中分配的,所以消息写入的速度非常快;
- kafka不必直接与底层的文件系统打交道,所有繁琐的IO操作都交给操作系统来处理;
- kafka写入操作采用追加写入的方式,避免了磁盘的随机写操作(对于普通的物理磁盘(非固态硬盘)随机读/写的吞吐量的确很慢,但是磁盘的顺序读/写操作其实是很快的,速度甚至可以匹敌内存的随机I/O速度)。
kafka在设计时采用了追加写入消息的方式,即只能在日志文件末尾追加写入新的消息,且不能修改已写入的消息,因此它属于典型的磁盘顺序访问型操作。
kafka消费端实现高吞吐量低延时的方法原理:kafka把消息写入操作系统的页缓存中,同样地,kafka在读取消息时会首先尝试从操作系统的页缓存中读取,且大部分消息很可能依然存在于页缓存中,如果命中就把消息经页缓存直接发送到网络的socket上,不用“穿透”到底层的物理磁盘上获取消息,同时这个过程用到了大名鼎鼎的零拷贝(zero copy)技术。
补充说明:传统的Linux操作系统中的I/O接口是依托于数据拷贝来是实现的,在零拷贝技术出现之前,一个I/O操作会将同一份数据进行多次拷贝,数据传输过程中还涉及到内核态与用户态的上下文切换,CPU的开销非常大,极大限制了操作系统高效进行数据传输的能力,而零拷贝技术很好的改善了这个问题。
【总结】kafka依靠下面4点达到了高吞吐量、低延时的设计目标:
- 大量使用操作系统页缓存,内存操作速度快且命中率高;
- kafka不直接参与物理I/O操作,而是交由最擅长此事的操作系统来完成;
- 采用追加写入方式,摒弃了缓慢的磁盘随机读/写操作;
- 使用以sendfile为代表的零拷贝技术加强网络间的数据传输效率;
2、消息持久化
kafka是要持久化消息到磁盘上的,这样做的好处是:
- 解耦消息发送和消息消费:通过将消息持久化使得生产者方不再需要直接和消费者方耦合,它只是简单的把消息生产出来并交由kafka服务器保存即可;
- 实现灵活的消息处理:可以很方便的实现消息重演,即对于已经处理过的消息可能在未来某个时间点需要重新处理一次。
普通系统在实现持久化时可能会先尽量使用内存,当内存资源耗尽时再一次性的把数据“刷盘”,而kafka则反其道而行之,所有数据都会立即被写入文件系统的持久化日志中,之后kafka服务器才会返回结果给客户端通知它们消息已被成功写入。这样能减少kafka程序对内存的消耗从而将节省出来的内存留给页缓存使用,更进一步提升性能。
3、负载均衡和故障转移
负载均衡就是指让系统的负载根据一定的规则均衡地分配在所有参与工作的服务器上,从而最大限度的提升系统整体的运行效率。
对于kafka来说就是,每台服务器broker都有均等的机会为kafka的客户提供服务,可以把负载分散到所有集群中的机器上。
kafka通过智能化的分区领导者选举来实现负载均衡,kafka默认提供智能的leader选举算法,可在集群的所有机器上以均等机会分散各个partition的leader,从而整体上实现负载均衡。
kafka的故障转移是通过使用会话机制实现的,每台kafka服务器启动后会以会话的形式把自己注册到zookeeper服务器上。一旦该服务器运转出现问题,与zookeeper的会话便不能维持从而超时失效,此时kafka集群会选举出另一台服务器来完全替代这台服务器继续提供服务。
4、伸缩性
伸缩性是指向分布式系统中增加额外的计算资源比如CPU、内存、存储或带宽等时吞吐量提升的能力。
如果一个CPU的运算能力是U,那么两个CPU的运算能力我们自然希望是2U,即可以线性的扩容计算能力,但是由于很多隐藏的“单点”瓶颈导致实际中几乎不可能达到。阻碍线性扩容的一个很常见的因素就是状态的保存,因为无论哪类分布式系统,集群中的每台服务器一定会维护很多内部状态,如果有服务器自己来保存这些状态信息,则必须要处理一致性的问题。相反,若服务器是无状态的,状态的保存和管理交由专门的协调服务来做比如zookeeper,那么整个集群的服务器之间就无需繁重的状态共享,就极大地降低了维护复杂度。倘若要扩容集群节点,只需简单的启动新的节点机器进行自动负载均衡就可以了。kafka正是采用上述思想,将每台kafka服务器上的状态统一交由zookeeper保管,扩展kafka集群时只需启动新的kafka服务器即可。说明:kafka服务器上并不是所有状态都不保存,之保存了很轻量级的内部状态,因此整个集群间维护状态一致性的代价很低。
三、kafka基本概念和术语
1、消息
消息由消息头部、key和value组成。kafka中的消息格式由很多字段组成,其中很多字段都是用于管理消息的元数据字段能,对用户是透明的。V1版本的消息格式如下图(不同版本可能会有稍微差异):
图 消息的完整格式
kafka使用紧凑的二进制字节数组来保存字段,也就是没有多余的比特位浪费。通常的Java堆上内存分配,即使有重排各个字段在内存的布局以减少内存使用量的优化措施,但仍有部分字节用于补齐之用。同时,运行Java的操作系统通常都默认开启了页缓存机制,也就是说堆上保存的对象很可能在页缓存中还保留一份,这就造成了极大的资源浪费。kafka在消息设计时直接使用紧凑的二进制字节数组ByteBuffer而不是独立的对象,避开了繁重的java堆上内存分配。因此,我们至少能够访问多一倍的可用内存。还有一点,大量使用页缓存而非堆内存还有一个好处——数据不丢失,即当出现kafka broker进程崩溃时,堆内存上的数据也一并消失,但页缓存的数据依然存在。
2、主题和分区即topic和partition:
topic只是一个逻辑概念,代表一类消息,也可以认为是消息被发送到的地方,通常我们可以使用topic来区分实际业务。
kafka中的topic通常都会被多个消费者订阅,出于性能的考量,kafka并不是topic-message的两级结构,而是采用topic-partition-message的三级结构来分散负载。topic与partition关系如下图.
图 topic和partition
kafka的partition实际上并没有太多的业务含义,它的引入就是单纯的为了提升系统的吞吐量。
topic是有多个partition组成的,而partition是不可修改的有序消息序列,也可以说是有序的消息日志。每个partition有自己专属的partition号,通常是从0开始。用户对partition唯一 能做的就是在消息序列的末尾追加写入消息。partition上的每条消息都会被分配一个唯一的序列号——位移。位移值也是从0开始顺序递增的整数,通过位移信息可以唯一定位到某partition下的一条信息。
3、位移offset
topic partition下的每条消息都被分配一个位移值,而在kafka消费者端也有位移的概念,注意区分。每条消息在某个partition的位移是固定的,但消费该partition的消费者的位移会随着消费进度不断前移,但不会超过前者。因此,今后讨论位移的时候一定给出清晰的上下文环境。
综上,可以断言kafka中的一条消息其实就是一个<topic, partition,offset>三元组。
4、replica副本、leader、follower
kafka中的分区partition是有序消息日志,那为了实现高可靠性,通过冗余机制——备份多份日志,而这些备份日志在kafka中被称为副本(replica),它们存在的唯一目的就是防止数据丢失。
kafka中的replica分为两个角色:领导者(leader)和追随者(follower)(类似过去的主备的提法(Master-slave)),也即副本分为两类:领导者副本(leader replica)和追随者副本(follower replica)。follower replica是不能提供服务给客户端的,也即不负责响应客户端发来的消息写入和消息消费请求,它只是被动地向领导者副本获取数据,保持与leader的同步,follower存在的唯一价值就是充当leader的候补,一旦leader replica所在的broker宕机,kafka会从剩余的replica中选举出新的leader继续提供服务。
图 kafka的leader-follower系统
5、ISR(与leader replica保持同步的replica集合)
比如一个partition可以配置N个replica,那么是否就以为着该partition可以容忍N-1个replica失效而不丢失数据呢?答案是“否”。
kafka为partition动态维护一个replica集合,该集合中的所有replica保存的消息日志都与leader replica保持同步状态,只有这个集合中的replica才能被选举为leader,也只有该集合中所有replica都接收到了同一条消息,kafka才会将该消息置于“已提交”状态,即认为这条消息发送成功。kafka能保证只要ISR集合中至少存在一个replica,那些“已提交”状态的消息就不会丢失——两个关键点:第一,ISR中至少存在一个“活着的”replica;第二,“已提交”消息。
正常情况下,partition的所有replica都应该与leader replica保持同步,即所有的replica都在ISR中,但因各种原因,小部分replica可能开始落后于leader replica的进度,当其滞后到一定程度时,kafka会将这些replica“踢出”ISR。相反,当这些replica重新“追上”了leader replica的进度时,kafka又会将它们加回到ISR中。这些都是自动维护的,不需人工干预。
四、kafka使用场景
1、消息传输:替代传统的消息总线等。
2、网站行为日志追踪:鉴于点击流数据量很大,kafka超强的吞吐量特性就有了用武之地。网站上的用户操作以消息的形式发送到kafka的某个对应topic中,然后使用机器学习或其他实时处理框架来帮助收集并分析。
3、审计数据收集:从各个运维应用程序处实时汇总操作步骤信息进行集中式管理,同时支持持久化特性,方便后续离线审计。
4、日志收集:各个机器上的分散日志,通过kafka进行全量收集,并集中送往下游的分布式存储如hdfs中。相对于其他主流的日志抽取框架比如flume,kafka有更好的性能,而且提供了完备的可靠性解决方案,同时还有低延时的特点。
5、流式处理:新版本kafka才推出的流式处理组件kafka streams,相对于典型的流式处理框架如Apache Storm、Apache Samza、Spark、Apache Flink等竞争力如何,让时间给出答案吧。
五、版本注意事项
自1.0.0版本开始,kafka版本号正式从原来的四位升级到了现在的3位,格式是<major>.<minor>.<patch>。
在kafka世界中,通常把producer和consumer统称为客户端即clients,这是与服务器端即broker相对应的。
选择kafka版本时要注意的几个分界点为:0.8版本才加入了集群间的备份机制;0.9.0.0版本开始才支持kafka security功能;0.10.0.0(含)之后的版本才有了流式处理组件kafka streams;但建议选择相对较新版本,功能更完善bug更少咯。
2014年kafka的创始人创办了公司——Confluent.io,从事商业化Kafka工具开发以及提供实时流式处理方面的产品。另外,confluent还分为开源版本和企业版本,企业版本中提供了对底层kafka集群完整的可视化监控解决方案以及一些辅助系统帮助管理集群,而开源版本与Apache社区的kafka并无太大区别。
第二部分:kafka线上环境部署
一、环境部署说明
略
二、集群环境规划
1、操作系统选型
除了现状的确是Linux服务器数量最多,单论它与kafka本身的相适性,Linux也要比Windows等操作系统更加适合部署kafka,能想到的原因有两个:I/O模型的使用和数据网络传输效率。
2、磁盘选型
使用机械硬盘完全可以满足kafka集群的使用,当然SSD更好。
关于JBOD(一堆普通磁盘的意思)和RAID(磁盘阵列)的选择,即使用一堆普通商用磁盘进行安装还是搭建专属的RAID呢?答案是具体问题具体分析。追求性价比的公司可以考虑使用JBOD.
3、磁盘容量规划
主要考虑以下因素:
- 新增消息数
- 消息留存时间
- 平均消息时间
- 副本数
- 是否启用压缩。
4、内存规划
kafka对于Java堆内存的使用不是很多,kafka将消息写入页缓存,一般情况下,broker所需的堆内存都不会超过6GB。
对于内存的规划建议如下:
- 尽量分配更多的内存给操作系统的page cache;
- 不要为broker设置过大的堆内存;
- page cache大小至少要大于一个日志段的大小(?)。
5、CPU规划
要追求多核而非高时钟频率。
6、带宽选择
规划建议为:
- 尽量使用高速网络;
- 根据自身网络条件和带宽来评估Kafka集群机器数量;
- 尽量避免使用跨机房网络。
7、kafka集群涉及的主要几类参数:
- broker端参数
- topic级别参数
- GC配置参数
- JVM参数
- OS参数。
第三部分:producer开发
一、序言
kafka内置有Java版本producer,而当前Apache kafka支持的第三方clients库有很多,这些第三方库基本上都是由非Apache kafka社区的人维护的,用户下载的是Apache kafka的话默认是不包含这些库的,需要单独下载对应的库。
Apache kafka封装了一套二进制通信协议,对于producer而言,用户几乎可以使用任意语言按照该协议进行编程,从而实现向kafka发送消息。
实际上内置的Java版本producer和上面列出的所有第三方库在底层都是相同的实现原理,这组协议本质上为不同的协议类型分别定义了专属的紧凑二进制字节数组格式,然后通过socket发送给合适的broker,之后等待broker处理完成后返回响应给producer。这样设计的好处就是具有良好的统一性——即所有的协议类型都是统一格式的,并且由于是自定义的二进制格式,这套协议不依赖任何外部序列号框架,从而显得轻量级也具有好的扩展性。
二、producer工作原理
说到producer,它的主要功能就是向某个topic的某个分区发送一条消息,所以它首先需要确定到底要向topic的哪个分区写入消息——这就是分区器做的事。
kafka producer提供了一个默认的分区器,对于每条待发送的消息,如果该消息指定了key,那么partitioner会根据key的哈希值来选择目标分区;若这条消息没有指定key,则partitioner使用轮训的方式确认目标分区,从而最大限度的保证消息在所有分区上的均匀性。
当然,producer提供了用户自行指定目标分区的API,即用户在消息发送时跳过partitioner直接指定要发送到的分区。另外,producer也允许用户实现自定义的分区策略而不使用默认的分区器。
第二,确认了目标分区之后,producer要做的第二个事就是寻找这个分区对应的leader,也就是该分区leader副本所在的kafka broker。因此,在发送消息时,producer也就有了多种选择来实现消息发送(比如不等待任何副本的响应便返回成功、只是等待leader副本响应写入操作后再返回成功等)。
producer简言之就是将用户待发送的消息封装成一个ProducerRecord对象,然后使用KafkaProducer.send方法进行发送。具体过程为:Producer首先使用一个线程(用户主线程,也即用户启动Producer的线程)将待发送的消息封装进一个ProducerRecord类实例,然后将其序列化之后发送给partitioner,再结合本地缓存的元数据信息由partitioner来确定目标分区后一同发送到位于producer程序中的一块内存缓冲区中。而KafkaProducer中的另一个专门的sender I/O线程则负责实时地从该缓冲区中提取出准备就绪的消息封装进一个批次(batch),统一发送给对应的broker。工作流程图如下图。
图 Java版本producer的工作流程
三、构造producer(详见Demo代码)
1、构造producer实例大致步骤
1>构造一个java.util.Properties对象,然后至少指定bootstrap.servers 、key.serializer、value.serializer这三个属性。
对于bootstrap.servers参数,若kafka集群中机器数很多,可只需指定部分broker即可,producer会通过该参数找到并发现集群中所有的broker。被发送到broker端的任何消息的格式必须是字节数组,因此消息的各个组件必须首先做序列化,然后才能发送到broker。一定注意的是,key.serializer和value.serializer两个参数必须是全限定名。
2>使用上一步中创建的Properties实例构造KafkaProducer对象。
/** 创建producer的时候同时指定key和value的序列化类,则不需在Properties中指定了。*/Serializer<String> keySerializer = new StringSerializer();Serializer<String> valueSerializer = new StringSerializer();Producer<String, String> producer = new KafkaProducer<String, String>(props, keySerializer, valueSerializer);
3>构造待发送的消息对象ProducerRecord,指定消息要被发送到的topic、分区及对应的key和value。注意,分区和key信息可以不用指定,有kafka自行确定分区。
4>调用KafkaProducer的send方法发送消息。
通过Java提供的Future同时实现了同步发送和异步发送+回调(Callback)两种发送方式。而上文代码清单中的调用方式实现了第三种发送方式——fire and forget即发送之后不管发送结果,在实际中不被推荐使用。真是使用场景中,同步和异步发送方式才是最常见的两种方式。
异步发送:实际上所有的写入操作默认都是异步的。send方法提供了回调类参数来实现异步发送以及发送结果的响应,具体代码如下:
/** 发送消息后的回调类Callback实际上是一个Java接口,用户可以创建自定义的Callback实现类来处理消息发送后的逻辑,* 只要该类实现org.apache.kafka.clients.producer.Callback接口即可。*/producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//两个参数不会同时非空if(exception == null) {//消息发送成功}else {//执行错误处理逻辑if(exception instanceof RetriableException) {
//处理可重试瞬时异常
}else {
//处理不可重试异常
}}}})
同步发送:调用Future.get()无限等待结果返回,即实现同步发送的效果,具体代码如下:
producer.send(record).get();//使用Future.get会一直等待直至Kafka broker将发送结果返回给producer程序.
【说明】无论同步发送和异步发送都有可能失败,当前kafka的错误类型包含两类:可重试异常和不可重试异常。所有可重试异常都继承自org.apache.kafka.common.errors.RetriableException抽象类。
5>关闭KafkaProducer。
producer程序结束时一定要关闭producer。提供有无参数的close方法和有超时参数close方法。在实际场景中,一定要慎用待超时参数的close方法。
2、producer的主要参数
acks:指定在给producer发送响应前,leader broker必须要确保已成功写入该消息的副本数。有3个取值:0、1和all。
acks | producer吞吐量 | 消息持久性 | 使用场景 |
0 | 最高 | 最差 | 1、完全不关心消息是否发送成功; 2、允许消息丢失(比如统计服务器日志等) |
1 | 适中 | 适中 | 一般场景即可 |
all或-1 | 最差 | 最高 | 不能容忍消息丢失 |
buffer.memory:指定producer端用于缓存消息的缓冲区大小,单位是字节,我们几乎可以认为该参数指定的内存大小就是producer程序使用的内存大小。
compression.type:指定是否压缩消息,默认是none。若要压缩直接指定压缩类型,目前kafka支持3中压缩算法:GZIP、Snappy和LZ4,根据实际使用经验producer结合LZ4的性能最好。
四、消息自定义分区机制(详见Demo代码)
producer提供了默认的分区策略及对应的分区器供用户使用,但有时候用户可能想实现自己的分区策略,就需要用户自定义实现。若要使用自定义分区机制,用户需要做两件事:
1、在producer程序中创建一个类,实现org.apache.kafka.clients.producer.Partitioner接口。主要分区逻辑在Partitioner.partition中实现;
2、在用于构造KafkaProducer的Properties对象中设置partitioner.class参数。
五、自定义序列化(详见Demo代码)
kafka支持用户自定义消息序列化,需要完成的3件事:
1、定义数据对象格式;
2、创建自定义序列化类,实现org.apache.kafka.common.serialization.Serializer接口,在serializer方法中实现序列化逻辑;
3、在用于构造KafkaProducer的Properties对象中设置key.serializer或value.serializer.
六、producer拦截器(详见Demo代码)
实现定制化逻辑,实例实现了一个简单的双interceptor组成的拦截链。
七、无消息丢失配置
KafkaProducer.send方法仅仅把消息放入缓冲区中,由一个专属I/O线程负责从缓冲区中提取消息并封装进消息batch中,然后发送出去,而这个过程中存在着数据丢失的窗口:若I/O线程发送之前producer崩溃,则存在缓冲区中的消息全部丢失了。采用同步发送不会丢数据,但是性能会很差,实际场景中不推荐使用,因此最好能有一份配置,既使用异步方式还能有效避免数据丢失。
1、producer端配置
block.on.buffer.full = true
acks = all or –1
retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
使用带回调机制的send发送消息,即KafkaProducer.sent(record, callback)
Callback逻辑中显式地立即关闭producer,使用close
2、broker端参数配置
unclean.leader.election.enable = false
replication.factor = 3
min.insync.replicas = 2
replication.factor > min.insync.replicas
enable.auto.commit = false
八、 producer多线程处理
存在两种基本的使用方法:多线程单KafkaProducer实例 + 多线程多KafkaProducer实例。
两种KafkaProducer使用方式比较
说明 | 优势 | 劣势 | |
单KafkaProducer实例 |
所有线程共享一个KafkaProducer实例 |
实现简单,性能好 |
1、所有线程共享一个内存缓冲区,可能需要较多内存;2、一旦producer某个线程崩溃导致KafkaProducer实例被“破坏”,则所有用户线程都无法工作。 |
多KafkaProducer实例 |
每个线程维护自己专属的KafkaProducer实例 |
1、每个用户线程拥有专属的KafkaProducer实例、缓冲区空间及一组对应的配置参数,可以进行细粒度的调优;2、单个KafkaProducer崩溃不会影响其他producer线程工作 |
需要较大的内存分配开销 |
【建议】如果是对分区数不多的Kafka集群而言,推荐使用第一种方法,即在多个producer用户线程*享一个KafkaProducer实例;若对那些拥有超多分区的集群而言,采用第二种方法具有较高的可控性,方便producer的后续管理。
第四部分:consumer开发
一、序言
1、版本对比
新旧版本consumer对比
编程语言 |
位移管理 |
API包名 |
主要使用类 |
||
新版本 |
使用消费者组(consumer group) |
Java |
新版本把位移提交到kafka的一个内部topic(__consumer_offsets)上。注意这个topic名字的前面有两个下划线 |
org.apache.kafka.clients.consumer.* |
KafkaConsumer |
旧版本 | 使用low-level consumer,分high-level和low-level两种API. | Scala | 旧版本把位移提交到zookeeper。 | kafka.consumer.* | ZookeeperConsumerConnector SimpleConsumer |
2、consumer分类
consumer分为两类:消费者组(consumer group)和独立消费者(standalone consumer),其中前者是由多个消费者实例(consumer instance)构成一个整体进行消费,而后者则单独执行消费操作。我们在讨论或开发consumer程序的时候,必须明确消费者上下文信息,即所使用的consumer的版本以及consumer的分类。
【消费者组】
- 一个consumer group可能有若干个consumer实例,当然一个group只有一个实例也是允许的;
- 对于同一个group而言,topic的每条消息只能被发送到group下的一个consumer实例上;
- topic消息可以被发送到多个group中。
kafka就是通过consumer group实现了对基于队列和基于发布/订阅两种消息引擎模型的支持的:
- 所有consumer实例都属于相同group——实现基于队列的模型,每条消息只会被一个consumer实例处理;
- 所有consumer实例都属于不同group——实现基于发布/订阅的模型。
group.id唯一标示一个consumer group,一个consumer实例可以是一个线程或是运行在其他机器上的进程。
3、位移相关说明
这里的位移指的是consumer端的offset,与分区日志中的offset是不同的含义。
很多消息引擎是把消费端的offset保存在服务器端(broker),这样做的好处是实现简单,但会存在下面的问题:
- broker从此变成了有状态的,增加了同步成本,影响伸缩性;
- 需要引入应答机制来确认消费成功;
- 由于要保存许多consumer的offset,故必然引入复杂的数据结构,从而造成不必要的资源浪费。
而kafka选择让consumer group保存offset,只需要保存一个长整型数据即可。当前kafka consumer在内部使用一个map来保存期订阅topic所属分区的offset。
新版本consumer把位移提交到kafka的一个内部topic(__consumer_offsets)上,用户应尽量避免执行该topic的任何操作。
二、构造consumer(详见Demo代码)
1、构造consumer实例大致步骤
1、构造一个java.util.Properties对象,至少指定bootstrap.servers、key.deserializer、value.deserializer和group.id的值;
2、使用上一步创建的Properties实例构造KafkaConsumer对象;
3、调用KafkaConsumer.subscribe方法订阅consumer group感兴趣的topic列表;
注意subscribe方法不是增量式的,后续的subscribe调用会完全覆盖之前的订阅语句。
4、循环调用KafkaConsumer.poll方法获取封装在ConsumerRecord的topic消息;
poll函数的参数是一个超时设定,通常如果consumer拿到了足够多的可用数据,那么它可立即从该方法返回;但若当前没有足够多的数据可供返回,consumer会处于阻塞状态,这个超时参数即控制阻塞的最大时间。这个超时设定给予了用户能够在consumer消费的同时定期去执行其他任务(但不知道具体实现)。否则设定一个比较大的值甚至是Integer.MAX_VALUE是不错的建议。
5、处理获取到的ConsumerRecord对象;
拿到这些kafka消息后consumer通常都包含处理逻辑,也即consumer的目的不仅是要从kafka处读取消息,还要对获取到的消息进行有意义的业务级处理。从kafka consumer的角度来说,poll方法返回即认为consumer成功消费了消息,但我们用户的观点通常认为是执行完真正的业务级处理之后才算消费完毕。因此,对于“consumer处理太慢”的问题要从两个方面定位明确瓶颈:第一,如果是poll返回消息的速度过慢,那么可以调节相应的参数来提升poll方法的效率;第二,若消息的业务级处理逻辑过慢,则应该考虑简化处理逻辑或把处理逻辑放入单独的线程执行。
6、关闭KafkaConsumer。
consumer脚本命令:目前来说,kafka所有命令行脚本表示相同含义的参数都不是统一的名字,比如consumer脚本中的名字是bootstrap-server,到了producer脚本中变成了broker-list,而在创建主题脚本中又变成了zookeeper。
三、订阅topic
1、订阅列表
在consumer group订阅topic列表使用下面语句即可:
consumer.subscribe(Arrays.asList("topic1","topic2","topic3"));
在独立consumer(standalone consumer),订阅列表则使用下面语句实现手动订阅:
TopicPartition tp1 = new TopicPartition("topic-name", 0);TopicPartition tp2 = new TopicPartition("topic-name", 1);consumer.assign(Arrays.asList(tp1, tp2));
2、基于正则表达式订阅topic
使用基于正则表达式的订阅必须指定ConsumerRebalanceListener,该类是一个回调接口,用户需要通过实现这个接口来是吸纳consumer分区分配方案发生变更时的逻辑。
如果用户使用的是自动提交(即设置enable.auto.commit=true),则通常不用理会这个类,用下面实现类即可。
consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());
但是当用户手动提交位移的,则至少要在ConsumerRebalanceListener实现类的onPartitionsRevoked方法中处理分区分配方案变更时的位移提交。
四、consumer.poll方法剖析(详见Demo代码)
1、poll的内部原理
kafka的consumer是用来读取消息的,且要能够同时读取多个topic的多个分区的消息。若要实现并行的消息读取,一种方法是使用多线程的方式,为每个要读取的分区都创建一个专有的线程去消费(这其实就是旧版本consumer采用的方式);另一种方法是采用类似Linux I/O模型的poll或select等,使用一个线程来同时管理多个socket连接,即同时与多个broker通信实现消息的并行读取(这就是新版consumer最重要的设计改变)。
新版本Java consumer是一个多线程或说是一个双线程的Java进程:创建KafkaConsumer的线程被称为用户主线程,同时consumer在后台会创建一个心跳线程。KafkaConsumer的poll方法在用户主线程中运行,而一旦consumer订阅了topic,所有的消费逻辑包括coordinator的协调、消费者组的rebalance以及数据的获取都会在主逻辑poll方法的一次调用中被执行。
2、poll使用方法
KafkaConsumer.poll方法引入参数的作用:
- 第一,超时设定;
- 第二,是想让consumer程序有机会定期“醒来”去做一些其他的事情,这是超时设定的最大意义。
poll的使用方法总结如下:
- consumer需要定期执行其他子任务:推荐poll(较小超时时间)+运行标识布尔变量的方式;
- consumer不需要定期执行子任务:推荐poll(MAX_VALUE)+捕获WakeupException的方式。
需要定期执行的代码:
try {while (isRunning){//将isRunning标示为volatile型,然后在其他线程中设置isRunning=false来控制consumer的结束。// 4、循环调用KafkaConsumer.poll方法获取封装在ConsumerRecord的topic消息;ConsumerRecords<String, String> records = consumer.poll(1000);// 5、处理获取到的ConsumerRecord对象;for (ConsumerRecord<String, String> record : records) {LOG.info("topic = %s, partition = %d, offset = %d", record.topic(), record.partition(), record.offset());}}} finally {// 千万不要忘记!!关闭KafkaConsumer。它不仅会清除consumer创建的各种socket资源,还会通知消费者组coordinator主动离组从而更快的开启新一轮rebalance。consumer.close();}
不需要定期执行的代码:
try {while (true){//设置为trueConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);//在consumer程序未获取到足够多数据时无限等待,然后通过捕获WakeupException异常来判断consumer是否结束。需要在另一个线程中调用consumer.wakeup()方法来触发consumer的关闭。// 5、处理获取到的ConsumerRecord对象;for (ConsumerRecord<String, String> record : records) {LOG.info("topic = %s, partition = %d, offset = %d", record.topic(), record.partition(), record.offset());}}} catch(WakeupException e) { // 此处忽略此异常的处理 }finally {// 千万不要忘记!!关闭KafkaConsumer。它不仅会清除consumer创建的各种socket资源,还会通知消费者组coordinator主动离组从而更快的开启新一轮rebalance。consumer.close();}
说明:KafkaConsumer不是线程安全的,但有一个例外就是wakeup方法,用户可以安全地在另一个线程中调用consumer.wakeup(). 其他KafkaConsumer方法都不能同时在多线程中使用。
五、交付语义
offset对于consumer非常重要,因为它是实现消息交付语义保证的基石,常见的3种消息交付语义保证如下:
- 最多一次(at most once)处理语义:消息可能丢失,但不会被重复处理;
- 最少一次(at least once)处理语义:消息不会丢失,但可能被处理多次;
- 精确一次(exactly once)处理语义:消息一定会被处理且只会被处理一次。
六、自动提交和手动提交(详见Demo代码)
consumer默认是自动提交的,优势是降低用户的开发成本,劣势是用户不能细粒度地处理位移的提交。
所谓的手动位移提交就是用户自行确定消息何时被真正处理完并可以提交位移。一个典型的应用场景是:用户需要对poll方法返回的消息集合中的消息执行业务级处理,用户想要确保只有消息被真正处理完成后再提交位移,如果使用自动位移提交则无法保证这种时序性,这种情况就必须使用手动位移提交。
设置使用手动位移提交的步骤:
- 在构建KafkaConsumer时设置enable.auto.commit=false;
- 然后调用commitSync或commitAsync方法即可。
自动提交和手动提交的比较
使用方法 | 优势 | 劣势 | 交付语义保证 | 使用场景 | |
自动提交 | 默认不用配置或显示设置enable.auto.commit=true | 开发成本低,简单易用 | 无法实现精确控制,位移提交失败后不易处理 | 可能造成消息丢失,最多实现“最少一次”处理语义 | 对消息交付语义无需求,容忍一定的消息丢失 |
手动提交 | 设置enable.auto.commit=false;手动调用commitSync或commitAsync提交位移 | 可精确控制位移提交行为 | 额外的开发成本,须自行处理位移提交 | 易实现“最少一次”处理语义,依赖外部状态鹅考实现“精确一次”处理语义 | 消息处理逻辑重,不允许消息消失,至少要求“最少一次”处理语义 |
手动提交位移API进一步细分为同步手动提交和异步手动提交,即commitSync和commitAsync方法。当用户调用上面两个方法时,consumer会为所有它订阅的分区提交位移。它们还有带参数的重载方法。用户调用带参数的方法时需要指定一个Map显示地告诉kafka为哪些分区提交位移。consumer只对它所拥有的分区做提交时更合理的行为,因此跟推荐带参数的重载方法。下面是一段典型的手动提交部分分区位移的代码:
//下面代码按照分区级别进行位移提交。它首先对poll方法返回的消息集合按照分区进行分组,然后每个分区下的消息待处理完成后构造一个Map对象统一提交位移,从而实现了细粒度控制位移提交。 try {while (running) {ConsumerRecord<String, String> records = consumer.poll(1000);for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {consumer.close();}
七、rebalance监听器(详见Demo代码)
新版本consumer默认是把位移提交到__consumer_offsets中,其实kafka也支持用户把位移提交到外部存储中,比如数据库中。若要实现这个功能,用户就必须使用rebalance监听器。
【注意】使用rebalance监听器的前提是用户使用consumer group,若使用的是consumer或是直接手动分配分区,那么rebalance监听器是无效的。
八、解序列化(详见Demo代码)
kafka consumer从broker端获取消息的格式是字节数组。自定义解序列化的步骤(同自定义序列化类似):
- 定义或复用serializer的数据对象格式;
- 创建自定义deserializer类,令其实现org.apache.kafka.common.serialization.Deserializer接口,在deserializer方法中实现deserialize逻辑;
- 在构造kafkaConsumer的Properties对象中设置key.deserializer或value.deserializer为上一步的实现类全限定名。
九、多线程消费实例(详见Demo代码)
下面介绍两种多线程消费的方法及实例代码:
1、每个线程维护一个KafkaConsumer
在这个方法中用户创建多个线程来消费topic数据,每个线程都会创建专属于该线程的KafkaConsumer实例,如图.
由图可知,consumer group由多个线程的KafkaConsumer组成,每个线程负责消费固定数目的分区。
2、单KafkaConsumer实例+多worker线程
本方法将消息的获取与消息的处理解耦,把后者放入单独的工作者线程中,即所谓的woker线程中,同时在全局维护一个或若干个consumer实例执行消息获取任务,如下图。
本例使用全局的kafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
第五部分:管理kafka集群
待补充。。。。
第六部分:监控kafka集群
待补充。。。。