subscribe()方法订阅
1.以集合的形式订阅多个主题,如果前后两次订阅了不同的主题,那么消费者以最后一次为准
@Override
public void subscribe(Collection<String> topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}
// 订阅了topic-demo主题
consumer.subscribe(Arrays.asList("topic-demo"));
2.以正则表达式的方式订阅主题,如果有人创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息
@Override
public void subscribe(Pattern pattern) {
subscribe(pattern, new NoOpConsumerRebalanceListener());
}
// 订阅了匹配topic-.*的主题,比如topic-demo
consumer.subscribe(Pattern.compile("topic-.*"));
assign()方法订阅
@Override
public void assign(Collection<TopicPartition> partitions)
这个方法只接受一个参数partitions用来指定需要订阅的分区集合
TopicPartition类的内容:topic,partition;分别表示分区所属的主题和自身的分区编号,下面是具体的类结构
/**
* A topic name and partition number
*/
public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
public int partition() {
return partition;
}
public String topic() {
return topic;
}
@Override
public int hashCode() {
if (hash != 0)
return hash;
final int prime = 31;
int result = 1;
result = prime * result + partition;
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
this.hash = result;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TopicPartition other = (TopicPartition) obj;
if (partition != other.partition)
return false;
if (topic == null) {
if (other.topic != null)
return false;
} else if (!topic.equals(other.topic))
return false;
return true;
}
@Override
public String toString() {
return topic + "-" + partition;
}
}
KafkaConsumer 中的partitionsFor()方法可以用来查询指定主题的元数据信息,然后获取到了PartitionInfo数据,从PartitionInfo中可以提取topic,partition等信息,然后将这些信息封装到TopicPartition 中
// 创建消费者客户端实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
ArrayList<TopicPartition> partitions = new ArrayList<>();
// 用来查询指定主题的元数据信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic-demo");
if (partitionInfos != null) {
for (PartitionInfo tpInfo : partitionInfos) {
// 获取主题以及分区
partitions.add(new TopicPartition(tpInfo.topic(), tpInfo.partition()));
}
}
// 订阅全部分区的功能
consumer.assign(partitions);
参考书籍:《深入理解Kafka:核心设计与实践原理》