Apache Kafka 0.9消费者客户端

当Kafka最初创建时,它与Scala生产者和消费者客户端一起运送。随着时间的推移,我们开始意识到这些API的许多限制。例如,我们有一个“高级”消费者API,它支持消费者组并处理故障转移,但不支持许多更复杂的使用场景。我们还有一个“简单”的消费者客户端,提供完全控制,但需要用户自己管理故障转移和错误处理。所以我们设定了重新设计这些客户端,以便开辟与老客户很难或不可能的许多用例,并建立一套我们可以长期支持的API。

第一阶段是在0.8.1中重写生产者API。最近的0.9版本完成了第二阶段,引入了新的Consumer API。基于卡夫卡本身提供的新的组织协调协议,新客户具有以下优势:

  • 清洁整合的API:新的消费者结合了旧的“简单”和“高级”消费者客户的功能,提供组合协调和低级别访问来构建您自己的消费战略。
  • 减少依赖性:新消费者是用纯Java编写的。它不依赖于Scala运行时或Zookeeper,这使它成为一个更轻松的图书馆,以包含在您的项目中。
  • 更好的安全性:在Kafka 0.9中实现的安全扩展只能由新的消费者支持。
  • 这个新的消费者还增加了一组用于管理容错消费者流程组的协议。以前,这个功能是用一个厚实的Java客户端实现的(与Zookeeper很大的交互)。这种逻辑的复杂性使得很难以其他语言构建功能齐全的消费者。随着这个新协议的引入,现在变得更加容易了。事实上,我们已经将C客户端转移到了这个协议。

虽然新消费者使用重新设计的API和新的协调协议,但这些概念并没有根本的不同,因此熟悉旧消费者的用户在理解它时不应该有太多的麻烦。然而,有一些微妙的细节,特别是关于组管理和线程模型,需要一些额外的关心。本教程的目的是涵盖新消费者的基本用法,并解释所有这些细节。

一个谨慎的一句话:在撰写本文时,新消费者在稳定性方面仍然被认为是“beta”。我们已经在0.9.0分支中修复了几个重要的错误,所以如果您遇到使用0.9.0.0版本的Kafka的任何问题,我们建议您对该分支进行测试。如果您仍然看到问题,请在邮件列表JIRA上报告。

入门

在编写代码之前,我们应该回顾一些基本概念。在卡夫卡(Kafka)中,每个主题分为一组称为分区的日志。生产者写在这些日志的尾部,消费者以自己的速度阅读日志。Kafka通过在消费者组中分配分区来缩放主题消耗,消费者组是共享公共组标识符的一组消费者。下图显示了一个具有三个分区和一个具有两个成员的消费者组的主题。主题中的每个分区只分配给组中的一个成员。

Apache Kafka 0.9消费者客户端

虽然旧消费者依赖Zookeeper进行团体管理,但新消费者使用Kafka本身内置的群组协调协议。对于每个组,其中一个经纪人被选为组协调员。协调员负责管理该组的状态。主要工作是在新成员到达时调停分区分配,老成员离开,主题元数据发生变化。重新分配分区的行为称为重新平衡组。

当初始化一个组时,消费者通常从每个分区中的最早或最新的偏移开始读取。然后依次读取每个分区日志中的消息。随着消费者的进步,它提交了已经成功处理的消息的偏移量。例如,在下图中,消费者的位置处于偏移6处,并且其最后提交的偏移量偏移1。

Apache Kafka 0.9消费者客户端

当分区被重新分配给组中的另一个消费者时,初始位置被设置为最后提交的偏移量。如果上述示例中的消费者突然崩溃,则接管分区的组成员将从偏移量1开始消费。在这种情况下,必须重新处理消息,直到消费者的位置为6。

该图还显示了日志中的另外两个重要位置。日志结束偏移量是写入日志的最后一条消息的偏移量。高水印是成功复制到所有日志副本的最后一条消息的偏移量。从消费者的角度来说,要知道的主要是你只能读取高水印。这样可以防止消费者读取以后可能丢失的未复制数据。

配置和初始化

要开始使用消费者,请将kafka-clients依赖项添加到您的项目中。maven代码片段如下:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0-cp1</version>
</dependency>

消费者使用属性文件构建,就像其他Kafka客户端一样。在下面的示例中,我们提供了使用消费者组所需的最小配置。

Properties props = new Properties();
props.put(“bootstrap.servers”,“localhost:9092”);
props.put(“group.id”,“consumer-tutorial”);
props.put(“key.deserializer”,StringDeserializer.class.getName());
props.put(“value.deserializer”,StringDeserializer.class.getName());
KafkaConsumer <String,String> consumer = new KafkaConsumer <>(props);

就像在老消费者和生产者一样,我们需要为消费者配置一个经纪人的初始列表,以便能够发现集群的其余部分。这不需要是集群中的所有服务器 - 客户端将确定列表中经纪人的全部活跃经纪人。我们在这里假设代理正在localhost上运行。消费者还需要被告知如何反序列化消息键和值。最后,要加入消费者组,我们需要配置组ID。在本教程中,我们将介绍更多的配置。

主题订阅

要开始消费,您必须首先订阅您的应用程序需要阅读的主题。在下面的例子中,我们订阅了主题“foo”和“bar”。

consumer.subscribe(Arrays.asList(“foo”,“bar”));

订阅后,消费者可以与该组的其余部分进行协调以获取其分区分配。当您开始使用数据时,这一切都将自动处理。稍后我们将展示如何使用assign API 手动分配分区,但请记住,无法混合自动和手动分配。

subscribe方法不是增量的:您必须包括要从中消费的主题的完整列表。您可以随时更改您订阅的主题集,当您拨打订阅时,以前订阅的任何主题将被新列表替换。

基本轮询循环

消费者需要能够并行获取数据,潜在地来自许多主题的许多分区,可能分散在许多经纪人之间。为此,它使用类似于在unix中的轮询或select调用的API样式:一旦注册了主题,所有将来的协调,重新平衡和数据提取将通过一个意向在事件循环中调用的一个轮询调用来驱动。这允许一个简单而有效的实现,可以处理来自单个线程的所有IO。

订阅主题后,您需要启动事件循环以获取分区分配并开始获取数据。这听起来很复杂,但您需要做的就是在一个循环中调用poll ,而消费者处理其余的操作。每次调用轮询从分配的分区返回一个(可能是空的)消息集。下面的示例显示了一个基本的轮询循环,它可以在获取的记录到达时打印出偏移量和值:

try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
consumer.close();
}

该调查API返回基于当前位置提取的记录。当组首次创建时,位置将根据重置策略(通常设置为每个分区的最早或最新偏移量)进行设置。一旦消费者开始提交偏移量,则每个稍后的重新平衡将重置位置到最后提交的偏移量。传递给轮询的参数控制消费者在等待当前位置记录时阻止的最大时间量。一旦任何记录可用,消费者立即返回,但如果没有可用的话,它将等待返回前指定的完整超时。

消费者被设计为以自己的线程运行。对于多线程使用而言,如果没有外部同步是不安全的,这可能不是一个好主意。在这个例子中,我们使用了一个标志,可以在应用程序关闭时用于从轮询循环中断。当该标志从另一个线程设置为false(例如关闭进程)时,一旦poll 返回并且应用程序完成处理返回任何记录,循环将中断。

完成使用后,应始终关闭消费者。这样做不仅可以清理任何使用的套接字,还可以确保消费者能够提醒协调人离开组。

此示例使用相对较小的超时来确保在关闭消费者时没有太多的延迟。或者,您可以使用长时间超时并使用wakeup API 从循环中断。

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + “: ” + record.value());
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}

我们已经将超时更改为使用Long.MAX_VALUE ,这基本上意味着消费者将无限期阻止,直到可以返回下一个记录。触发关机的线程不是设置标志,而是可以调用consumer.wakeup()来中断一个主动轮询,导致它抛出一个WakeupException异常。该API可以安全地从另一个线程使用。请注意,如果没有正在进行的正在进行的轮询,则会从下一次调用中引发异常。在这个例子中,我们捕获异常以防止它被传播。

把它放在一起

在下一个例子中,我们将把所有这些组合在一起构建一个简单的Runnable 任务,它初始化消费者,订阅主题列表,并无限期执行轮询循环,直到外部关闭。

public class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id; public ConsumerLoop(int id,
String groupId,
List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put(“group.id”, groupId);
props.put(“key.deserializer”, StringDeserializer.class.getName());
props.put(“value.deserializer”, StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
} @Override
public void run() {
try {
consumer.subscribe(topics); while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
} public void shutdown() {
consumer.wakeup();
}
}

要测试这个例子,你将需要一个运行0.9.0.0版本的Kafka代理和一些要消费的字符串数据的主题。将一串字符串数据写入主题的最简单的方法是使用kafka-verifiable-producer.sh 脚本。为了使其有意义,我们还应该确保主题有多个分区,以便一个成员不要做所有的工作。例如,使用单个Kafka代理和Zookeeper都可以在localhost上运行,您可以从Kafka发行版的根目录中执行以下操作:

# bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor  --partitions  --zookeeper localhost:
# bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages  --broker-list localhost:

然后,我们可以创建一个小型驱动程序来设置一个具有三个成员的消费者组,全部订阅了我们刚创建的相同主题

public static void main(String[] args) {
int numConsumers = 3;
String groupId = "consumer-tutorial-group"
List<String> topics = Arrays.asList("consumer-tutorial");
ExecutorService executor = Executors.newFixedThreadPool(numConsumers); final List<ConsumerLoop> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
consumers.add(consumer);
executor.submit(consumer);
} Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (ConsumerLoop consumer : consumers) {
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace;
}
}
});
}

这个例子将三个可运行的消费者提交给执行者。每个线程都有一个单独的id,以便您可以看到哪个线程正在接收数据。停止进程时会调用shutdown hook,这将使用wakeup 停止三个线程 并等待它们关闭。如果你运行这个,你应该看到很多来自所有线程的数据。以下是一次运行的示例:

:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value = }
:{partition = ,offset = ,value= }

输出显示所有三个分区的消耗。每个分区已分配给其中一个线程。在每个分区中,您可以看到偏移量按预期增加。您可以使用Ctrl-C从命令行或通过IDE关闭进程。

消费者活性

当消费者组的一部分,每个消费者被分配一个子集的分组从它已经订阅的主题。这基本上是这些分区上的组锁。只要锁定,组中的其他成员就无法从中读取。当你的消费者健康时,这正是你想要的。这是避免重复消费的唯一方法。但是如果消费者因机器或应用程序故障而死亡,则需要释放该锁,以便将分区分配给健康成员。

卡夫卡的团队协调协议使用心跳机制解决了这个问题。在每次重新平衡之后,当代的所有成员开始向组协调员发送定期心跳。只要协调员继续接收心跳,它假定成员健康。在每次接收的心跳时,协调器启动(或复位)定时器。如果在定时器到期时没有收到心跳线,协调器会将成员标记为死机,并向组中的其余部分发信号通知他们应该重新加入,以便可以重新分配分区。定时器的持续时间称为会话超时,并在客户端上配置了session.timeout.ms 。

props.put(“session.timeout.ms”,“”);

会话超时确保如果机器或应用程序崩溃或网络分区将消费者与协调器隔离,则锁将被释放。然而,应用程序的故障通常会更加棘手。只是因为消费者仍然向协调人发送心跳并不一定意味着应用程序是健康的。

消费者的轮询循环旨在处理这个问题。当您调用poll 或其他阻塞API 时,所有网络IO都在前台完成。消费者不使用任何后台线程。这意味着,心跳只发送到当你调用协调调查。如果您的应用程序停止轮询(无论是因为处理代码已引发异常或下游系统已崩溃),则不会发送心跳线,会话超时将过期,并且该组将重新平衡。

唯一的问题是如果消费者花费超过会话超时来处理消息,则可能会触发虚假的重新平衡。因此,您应该将会话超时设置为足够大以使其不太可能。默认值为30秒,但将其设置为高达几分钟不是不合理的。更大会话超时的唯一缺点是协调器需要更长时间来检测真正的消费者崩溃。

交付语义

当消费者组首次创建时,根据auto.offset.reset 配置设置定义的策略设置初始偏移量。一旦消费者开始处理,它会根据应用程序的需要定期进行偏移。在每次后续的重新平衡之后,该位置将被设置为组中该分区的最后提交的偏移量。如果消费者在提交已成功处理的消息的偏移量之前崩溃,则另一个消费者将最终重复工作。您更频繁地提交偏移量,您将在崩溃中看到的重复次数越少。

在迄今为止的例子中,我们假设自动提交策略已启用。当将enable.auto.commit 设置为true(这是默认值)时,消费者会根据配置为“auto.commit.interval.ms”的间隔周期性地触发偏移提交。通过减少提交间隔,您可以限制消费者在发生崩溃时必须进行的重新处理的数量。

要使用消费者的提交API,您应该首先通过在用户配置中将enable.auto.commit 设置为false来禁用自动提交。

props.put(“enable.auto.commit”,“false”);

提交API本身很简单,但最重要的是它是如何集成到轮询循环中的。因此,以下示例包括具有粗体的提交详细信息的完整轮询循环。手动处理提交的最简单方法是使用同步提交API:

try{
while(running){
ConsumerRecords <String,String> records = consumer.poll(1000);
for(ConsumerRecord <String,String> record:records)
System.out.println(record.offset()+“:”+ record.value()); try{
consumer.commitSync();
} catch(CommitFailedException e){
// application specific failure handling
}
}
} finally {
consumer.close();
}

使用commitSync API不带任何参数犯上次调用返回的偏移轮询。此调用将无限期阻止,直到提交成功或失败,并显示不可恢复的错误。当消息处理所花费的时间超过会话超时时,您需要担心的主要错误发生。当这种情况发生时,协调者将消费者从组中踢出,从而导致CommitFailedException 抛出。您的应用程序应该通过尝试回滚自上次成功提交的偏移量以来消耗的消息导致的任何更改来处理此错误。

通常,您应确保仅在邮件成功处理完毕后才会提交偏移量。如果消息在发送提交之前崩溃,则消息将不得不被再次处理。如果提交策略保证最后提交的偏移量永远不会超过当前位置,那么您将具有“至少一次”传递语义。

Apache Kafka 0.9消费者客户端图3:提交的偏移超出当前位置

通过更改提交策略来保证当前位置不会超过上一个提交的偏移量,如上图所示,您将获得“最多一次”传递。如果消费者在其位置赶上最后提交的偏移量之前崩溃,那么该差距中的所有消息都将“丢失”,但是您可以确保不会多次处理任何消息。为了实现这个策略,我们只需要改变提交和消息处理的顺序。

try{
while(running){
ConsumerRecords <String,String> records = consumer.poll(1000); try{
consumer.commitSync();
for(ConsumerRecord <String,String> record:records)
System.out.println(record.offset()+“:”+ record.value());
} catch(CommitFailedException e){
// application specific failure handling
}
}
} finally {
consumer.close();
}

请注意,使用自动提交可以“至少处理一次”处理,因为消费者保证仅对已经返回给应用程序的消息进行偏移。您在最坏情况下可能需要重新处理的消息数量受应用程序在提交间隔期间可以处理的消息数量(由auto.commit.interval.ms 配置)的限制。

但是,通过使用commit API,您可以更好地控制您愿意接受多少重复处理。在最极端的情况下,您可以在处理每个消息后提交偏移量,如以下示例所示:

try{
while(running){
ConsumerRecords <String,String> records = consumer.poll(1000); try{
for(ConsumerRecord <String,String> record:records){
System.out.println(record.offset()+“:”+ record.value());
consumer.commitSync(Collections.singletonMap(record.partition(),new OffsetAndMetadata(record.offset()+ 1)));
}
} catch(CommitFailedException e){
// application specific failure handling
}
}
} finally {
consumer.close();
}

在这个例子中,我们在调用commitSync时传递了我们要提交的显式偏移量。承诺的偏移量应始终为应用程序读取的下一条消息的偏移量。当没有参数调用commitSync时,消费者将提交返回给应用程序的最后一个偏移量(加上一个),但是我们不能使用这个,因为它将允许承诺的位置超出我们的实际进度。

显然,在每个消息之后提交可能不是大多数用例的好主意,因为处理线程必须阻止从服务器返回的每个提交请求。这会杀死吞吐量。一个更合理的方法可能是在每N个消息之后提交N,N可以被调整以获得更好的性能。

在本示例中,commitSync 的参数是从主题分区到OffsetAndMetadata 实例的映射。提交API允许您在每个提交中包含一些其他元数据。这可以用于记录提交的时间,发送它的主机或应用程序所需的任何信息。在这个例子中,我们把它留空了。

在完成每个分区的消息处理后,不必承诺收到每条消息,更合理的策略可能是提交偏移量。所述ConsumerRecords 集合提供访问该组包含的分区,并为每个分区的消息。下面的例子演示了这个政策。

try{
while(running){
ConsumerRecords <String,String> records = consumer.poll(Long.MAX_VALUE);
for(TopicPartition partition:records.partitions ()){
List <ConsumerRecord <String,String >> partitionRecords = records.records(partition) ;
for(ConsumerRecord <String,String> record:partitionRecords)
System.out.println(record.offset()+“:”+ record.value()); long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastoffset + 1)));
}
}
} finally {
consumer.close();
}

迄今为止的示例集中在同步提交API,但消费者还暴露了异步API,commitAsync 。使用异步提交通常会提供更高的吞吐量,因为应用程序可以在提交返回之前开始处理下一批消息。权衡之处在于,您可能稍后才会发现提交失败。下面的例子显示了基本用法:

try{
while(running){
ConsumerRecords <String,String> records = consumer.poll(1000);
for(ConsumerRecord <String,String> record:records)
System.out.println(record.offset()+“:”+ record.value()); consumer.commitAsync(new OffsetCommitCallback(){
@
Override public void onComplete(Map <TopicPartition,OffsetAndMetadata> offsets,
Exception exception){
if(exception!= null){
//应用程序特定的故障处理
}
}
})
}
} finally {
consumer.close();
}

请注意,我们已经为commitAsync 提供了一个回调函数,当提交完成(成功或不成功)时,它将由使用者调用。如果你不需要这个,你也可以调用commitAsync 没有参数。

消费者群体检查

当消费者组处于活动状态时,您可以使用位于Kafka分发的bin目录中的consumer-groups.sh 脚本从命令行检查分区分配和消费进度。

#bin / kafka-consumer-groups.sh -new-consumer -describe -group consumer-tutorial-group -bootstrap-server localhost:

使用手动分配

如本教程开始所述,新的消费者对不需要消费者群体的用例实现较低级别的访问。这样做的便利性是采用这种API的最强的原因之一。旧的“简单”消费者也提供了这一点,但是它要求你自己做很多错误处理。使用新的消费者,您只需要分配要读取的分区,然后开始轮询数据。

下面的示例显示了如何使用partitionsFor API 从主题分配所有分区

List<TopicPartition> partitions = new ArrayList <>();
for(PartitionInfo partition:consumer.partitionsFor(topic))
partitions.add(new TopicPartition(topic,partition.partition())); consumer.assign(partitions);

类似于订阅,分配的调用必须通过要读取的分区的完整列表。一旦分配了分区,轮询循环就像以前一样工作。

不过一句话要小心。无论是简单的消费者还是消费者组,所有偏移提交都通过组协调器。因此,如果您需要提交偏移量,那么您仍然必须将group.id 设置为合理的值,以防止与其他消费者的冲突。如果一个简单的消费者尝试使用与活动消费者组匹配的组ID进行偏移,则协调器将拒绝该提交(这将导致CommitFailedException)。但是,如果另一个简单的消费者实例共享相同的组ID,则不会出现任何错误。

上一篇:Java中的对象都是在堆上分配的吗?


下一篇:火狐浏览器开始支持3D游戏和视屏通话