1.目标
在我们的上一篇文章中,我们讨论了Kafka Producer。今天,我们将讨论Kafka Consumer。首先,我们将看到什么是Kafka Consumer和Kafka Consumer的例子。之后,我们将学习Kafka Consumer Group。此外,我们将看到Kafka Consumer的消费者记录API和配置设置。
创建Kafka Producer后,将消息发送到Apache Kafka集群。现在,我们正在创建一个Kafka Consumer来使用来自Kafka集群的消息。
所以,让我们详细讨论Kafka Consumer。
Apache Kafka Consumer | 卡夫卡消费者集团
2.什么是卡夫卡消费者?
从Kafka Topics读取数据的应用程序就是我们所说的Consumer。基本上,Kafka Consumer订阅了Kafka集群中的一个或多个主题,然后进一步提供来自Kafka主题的令牌或消息。
此外,使用Heartbeat,我们可以了解Consumer与Kafka Cluster的连接性。但是,让我们定义Heartbeat。它设置在Consumer,让Zookeeper或Broker Coordinator知道Consumer是否仍然连接到Cluster。因此,如果心跳不存在,Kafka Consumer将不再连接到群集。在这种情况下,经纪协调员必须重新平衡负载。此外,Heartbeat是群集的开销。此外,通过考虑数据吞吐量和开销,我们可以配置心跳为消费者的时间间隔。
什么是Apache Kafka Consumer
此外,我们可以对消费者进行分组,而Kafka中的消费者群体中的消费者可以共享他们订阅的Kafka主题的分区。要理解,如果主题中有N个分区,Kafka Consumer Group中的N个消费者和该组已订阅主题,则每个消费者将从主题的分区中读取数据。因此,我们可以说,这只是一个消费者可以成群结队的提醒。
让我们用命令修改Apache Kafka Operations
要具体来说,要连接到Kafka集群并使用数据流,Kafka的Consumer API会有所帮助。
下面是显示Apache Kafka Consumer的图片:
Apache Kafka Consumer的工作
要订阅一个或多个主题并处理在应用程序中生成的记录流,我们使用此Kafka Consumer API。换句话说,我们使用KafkaConsumer API来使用来自Kafka集群的消息。而且,下面看KafkaConsumer类的构造函数。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
- CONFIGS
返回消费者配置图。
KafkaConsumer类有以下重要方法:1。public
java.util.Set <TopicPar-tition> assignment()
获取当前由使用者分配的分区集。
2. public string subscription()
为了订阅给定的主题列表,获取动态分配的分区。
探索Kafka性能调优 - Kafka优化的方法
3. public void sub-scribe(java.util.List <java.lang.String> topics,ConsumerRe-balanceListener listener)
此外,订阅给定动态分配的主题列表分区。
4。 public void unsubscribe()
现在,取消订阅给定分区列表中的主题。
5. public void sub-scribe(java.util.List <java.lang.String> topics)
为了订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其视为与unsubscribe()相同。
6. public void subscribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)
这里,参数模式引用正则表达式格式的订阅模式,并且listener参数从订阅模式获取通知。
7. public void as-sign(java.util.List <TopicParti-tion> partitions)
手动为客户分配分区列表。
8. 民意调查()
获取使用其中一个subscribe / assign API指定的主题或分区的数据。如果在轮询数据之前未订阅主题,则会返回错误。
9. public void commitSync()
为了提交所有订阅的主题和分区列表的最后一个poll()中返回的偏移量。对commitAsyn()应用相同的操作。
10. public void seek(TopicPartition partition,long offset)
获取消费者将在下一个poll()方法中使用的当前偏移值。
阅读Kafka的优点和缺点
11. public void resume()
为了恢复暂停的分区。
12. public void wakeup()
唤醒消费者。
3. ConsumerRecord API
基本上,要从Kafka集群接收记录,我们使用ConsumerRecord API。它包括一个主题名称,分区号,从中接收记录的偏移量也指向Kafka分区中的记录。此外,要创建具有特定主题名称,分区计数和<key,value>对的使用者记录,我们使用consumerRecord类。它的签名是:
- public ConsumerRecord (字符串主题,int分区,长偏移量,K键,V值)
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
- 话题
从Kafka群集收到的消费者记录的主题名称。
- 划分
主题的分区。
- 键
记录的密钥,如果没有密钥存在,则返回null。
- 值
记录内容。
学习Apache Kafka Streams | 流处理拓扑
4. ConsumerRecords API
基本上,它是ConsumerRecord的容器。要保留特定主题的每个分区的ConsumerRecord列表,我们使用此API。它的构造函数是:
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
- TopicPartition
返回特定主题的分区映射。
- 记录
返回ConsumerRecord列表。
这些是ConsumerRecords类的以下方法:
1。public int count()
所有主题的记录数。
2. public Set partitions()
具有此记录集中数据的分区集(如果未返回任何数据,则该集为空)。
3. public Iterator iterator()
通常,迭代器使您可以遍历集合,获取或删除元素。
4. public list records()
基本上,获取给定分区的记录列表。
5. ConsumerRecord API与ConsumerRecords API
一个。ConsumerRecord API
ConsumerRecord API是从Kafka接收的键/值对。它包含主题名称和分区号,从中接收记录以及指向Kafka分区中记录的偏移量。
湾 ConsumerRecords API
然而,ConsumerRecords API是一个容器,它为特定主题的每个分区保存ConsumerRecord列表。基本上,Consumer.poll(long)操作返回的每个主题分区都有一个ConsumerRecord列表。
Apache Kafka工作流程| Kafka Pub-Sub Messaging
6.配置设置
在这里,我们列出了Consumer客户端API的配置设置 -
1. bootstrap.servers
它引导了代理列表。
2. group.id
将个人消费者分配给一个组。
3. enable.auto.commit
基本上,如果值为true,则启用偏移的自动提交,否则不提交。
4. auto.commit.interval.ms
基本上,它返回更新的消耗偏移量写入ZooKeeper的频率。
5. session.timeout.ms
它表示Kafka在放弃并继续使用消息之前等待ZooKeeper响应请求(读或写)的毫秒数。
7. SimpleConsumer应用程序
确保生产者应用程序步骤在此处保持不变。这里也开始你的ZooKeeper和Kafka经纪人。此外,使用名为SimpleCon-sumer.java 的java类创建SimpleConsumer应用程序。然后键入以下代码:
阅读Apache Kafka职业范围与薪资趋势。
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
A。汇编
通过使用以下命令,我们可以编译应用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
B.执行
而且,使用以下命令我们可以执行应用程序。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
C.输入
此外,打开生产者CLI并向主题发送一些消息。我们可以将简单输入作为'Hello Consumer'。
d. 产量
该输出是
- 订阅主题Hello-Kafka
- offset = 3 ,key = null,value = Hello Consumer
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer
8.卡夫卡消费者集团
基本上,Kafka中的Consumer组是来自Kafka主题的多线程或多机器消费。
卡夫卡消费者 - 卡夫卡消费者集团
- 通过使用相同的group.id,消费者可以加入一个组。
- 组的最大并行度是组中的消费者数量←分区数。
- 此外,Kafka将主题的分区分配给组中的消费者。因此,每个分区仅由该组中的一个消费者使用。
- 此外,Kafka保证消息只能由组中的单个消费者读取。
- 消费者可以按照日志中存储的顺序查看消息。
看看Storm Kafka与配置和代码
的集成 a。重新平衡消费者
基本上,增加更多流程/线程将导致Kafka重新平衡。基本上,如果任何消费者或代理以某种方式无法向ZooKeeper发送心跳,则可以通过Kafka群集重新配置它。此外,在此重新平衡期间,Kafka将可用分区分配给可用线程,可能将分区移动到另一个进程。
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
ii. Compilation
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
iii. Execution
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
因此,我们可以看到我们创建了名称的样本组,my-group和两个消费者。
湾 输入
现在,在打开生产者CLI后,发送一些消息,如 -
Test consumer group 01
Test consumer group 02
- 测试消费者组01
- 测试消费者组02
C。第一个过程的输出
学习Apache Kafka用例| 卡夫卡应用程序
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
- 订阅主题Hello-kafka
- offset = 3 ,key = null,value =测试消费者组01
d。此外,第二个过程的输出
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
- 订阅主题Hello-kafka
- offset = 3 ,key = null,value =测试消费者组02
所以,这完全是关于Kafka的Apache Kafka消费者和消费者群体的例子。希望你喜欢我们的解释。
9.结论:卡夫卡消费者
因此,我们通过使用Java客户端演示详细了解了Kafka Consumer和ConsumerGroup。此外,通过这个,我们了解了如何使用Java客户端发送和接收消息。此外,我们讨论了Kafka Consumer记录API和Consumer Records API以及两者的比较。此外,我们还学习了Kafka Consumer客户端API的配置设置。但是,如果有任何疑问,请随时在评论部分询问。
另请参阅 -
Kafka Broker
供参考