Kafka中Consumer订阅主题与分区的几种方式

subscribe()方法订阅

1.以集合的形式订阅多个主题,如果前后两次订阅了不同的主题,那么消费者以最后一次为准

@Override
public void subscribe(Collection<String> topics) {
    subscribe(topics, new NoOpConsumerRebalanceListener());
}

// 订阅了topic-demo主题
consumer.subscribe(Arrays.asList("topic-demo"));

2.以正则表达式的方式订阅主题,如果有人创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息

@Override
public void subscribe(Pattern pattern) {
    subscribe(pattern, new NoOpConsumerRebalanceListener());
}

// 订阅了匹配topic-.*的主题,比如topic-demo
consumer.subscribe(Pattern.compile("topic-.*"));

assign()方法订阅

@Override
public void assign(Collection<TopicPartition> partitions)

这个方法只接受一个参数partitions用来指定需要订阅的分区集合

TopicPartition类的内容:topic,partition;分别表示分区所属的主题和自身的分区编号,下面是具体的类结构

/**
 * A topic name and partition number
 */
public final class TopicPartition implements Serializable {

    private int hash = 0;
    private final int partition;
    private final String topic;

    public TopicPartition(String topic, int partition) {
        this.partition = partition;
        this.topic = topic;
    }

    public int partition() {
        return partition;
    }

    public String topic() {
        return topic;
    }

    @Override
    public int hashCode() {
        if (hash != 0)
            return hash;
        final int prime = 31;
        int result = 1;
        result = prime * result + partition;
        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
        this.hash = result;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        TopicPartition other = (TopicPartition) obj;
        if (partition != other.partition)
            return false;
        if (topic == null) {
            if (other.topic != null)
                return false;
        } else if (!topic.equals(other.topic))
            return false;
        return true;
    }

    @Override
    public String toString() {
        return topic + "-" + partition;
    }
}

KafkaConsumer 中的partitionsFor()方法可以用来查询指定主题的元数据信息,然后获取到了PartitionInfo数据,从PartitionInfo中可以提取topic,partition等信息,然后将这些信息封装到TopicPartition 中

// 创建消费者客户端实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

ArrayList<TopicPartition> partitions = new ArrayList<>();
// 用来查询指定主题的元数据信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic-demo");
if (partitionInfos != null) {
    for (PartitionInfo tpInfo : partitionInfos) {
        // 获取主题以及分区
        partitions.add(new TopicPartition(tpInfo.topic(), tpInfo.partition()));
    }
}

// 订阅全部分区的功能
consumer.assign(partitions);

参考书籍:《深入理解Kafka:核心设计与实践原理》

上一篇:生产者消费者模型(实验)


下一篇:SpringCloud API 网关(Zuul)