我们知道每个Topic会分配为很多partitions,Producers会将数据分配到每个partitions中,然后消费者Consumers从partitions中获取数据消费,那么Producers是如何将数据分到partitions中?Consumers又怎么知道从哪个partitions中消费数据?
生产者往Topic写数据
我们从product.send方法入手,看看里面的具体实现,可以看到在调用send方法时,其内部是调用了doSend方法,在doSend方法中有一个获取partitions的方法
int partition = partition(record, serializedKey, serializedValue, cluster);
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
从上面代码中,首先先选择配置的分区,如果没有配置则使用默认的分区,即使用了DefaultPartitioner中的partition方法
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 没有可用的分区,则给一个不可用分区
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// 根据key的hash值和分区数取模
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
上面代码先会根据topic获取所有的分区
1,如果key为null,则通过先产生随机数,之后在该数上自增的方式产生一个数nextValue,如果存在可用分区,将nextValue转为正数之后对可用分区进行取模操作,如果不存在可用分区,则将nextValue对总分区数进行取模操作
2,如果key不为空,就先获取key的hash值,然后和分区数进行取模操作
消费者从Topic读数据
kafka默认对消费分区指定了两种策略,分别为Range策略(org.apache.kafka.clients.consumer.RangeAssignor)和RoundRobin策略(org.apache.kafka.clients.consumer.RoundRobinAssignor),它们都实现了PartitionAssignor接口
Range策略
比如有10个分区,分别为P1、P2、P3、P4、P5、P6、P7、P8、P9、P10,三个消费者C1、C2、C3,消费如下图:
我们来看看源代码:
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
// 得到topic和订阅的消费者集合信息,例如{t1:[c1,c2,c3], t2:[c1,c2,c3,c4]}
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
// 将consumersPerTopic信息转换为assignment,memberId就是消费者client.id+uuid(kafka在client.id上追加的)
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>());
// 遍历每个Topic,获取所有的订阅消费者
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List<String> consumersForTopic = topicEntry.getValue();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
// 如果Topic没有分区,则调过
if (numPartitionsForTopic == null)
continue;
// 将Topic的订阅者根据字典排序
Collections.sort(consumersForTopic);
// 总分区数/订阅者的数量 得到每个订阅者应该分配分区数
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 无法整除的剩余分区数量
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
//遍历所有的消费者
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
//分配到的分区的开始位置
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
// 分配到的分区数量(整除分配到的分区数量,加上1个无法整除分配到的分区--如果有资格分配到这个分区的话。判断是否有资格分配到这个分区:如果整除后余数为m,那么排序后的消费者集合中前m个消费者都能分配到一个额外的分区)
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
//给消费者分配分区
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
上面的代码添加了注释很清楚的展现了range的实现,对应上面的例子,如果有4个消费者C1、C2、C3、C4,那么根据上面的算法:
C1 -> [P1,P2,P3] ,C2 -> [P4,P5,P6] ,C3 -> [P7,P8] C4 -> [P9,P10] 。取余多出来的两个分区,由最前n个消费者来消费
RoundRobin策略
将主题的所有分区依次分配给消费者,比如有两个Topic:T1[P1,P2,P3,P4],T2[P5,P6,P7,P8,P9,P10],若C1、C2订阅了T1,C2、C3订阅了T2,那么C1将消费T1[P1,P3],C2将消费T1[P2,P4,P6,P8,P10],C3将消费T2[P5,P7,P9],如下图:
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>());
// 将消费集合先按字典排序,构建成一个环形迭代器
CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
// 按Topic的名称排序,得到Topic下的所有分区
for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
final String topic = partition.topic();
while (!subscriptions.get(assigner.peek()).topics().contains(topic))
assigner.next();
// 给消费者分配分区,并轮询到下一个消费者
assignment.get(assigner.next()).add(partition);
}
return assignment;
}
/**
* 根据消费者得到订阅的Topic下的所有分区
* Topic按名称字典排序
*/
public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
SortedSet<String> topics = new TreeSet<>();
for (Subscription subscription : subscriptions.values())
topics.addAll(subscription.topics());
List<TopicPartition> allPartitions = new ArrayList<>();
for (String topic : topics) {
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic != null)
allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));
}
return allPartitions;
}
本文来源自与公众号《百川分享会》:baichuanshare