1、生产者源码之Producer分区器选择深度剖析
-
前面我们已经获取到元数据了,接下来我们就可以计算一下,一条消息会发送给topic的哪一个分区中,这里就涉及到底层的分区策略,分区器的选择。
-
核心代码
/** * todo: 第三步:根据分区器选择消息应该发送的分区 * 根据元数据信息可以计算出消息应该发送到topic的哪一个分区中 */ int partition = partition(record, serializedKey, serializedValue, cluster);
-
partition
方法分析//todo:如果发送消息的过程中,指定了分区编号,直接使用该分区编号就可以了 //但是正常情况下,消息是没有指定分区编号的 Integer partition = record.partition(); return partition != null ? partition : //todo: 使用分区器进行分区 partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
-
默认分区器
DefaultPartitioner
的partition
方法分析public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //todo: 首先获取我们要发送消息对应的topic的分区消息 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //todo: 计算topic的分区总数 int numPartitions = partitions.size(); //todo: 策略一:发送消息的时候 没有指定key if (keyBytes == null) { //定义一个计数器 每次执行实现加1的操作 int nextValue = counter.getAndIncrement(); //获取可用的分区信息 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { //一个自增的数对分区总数取模,来达到轮训的效果,达到负载均衡 //6 % 3=0 //7 % 3=1 //8 % 3=2 //9 % 3=0 int part = Utils.toPositive(nextValue) % availablePartitions.size(); //根据该值分配分区号 return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { //todo: 策略二:指定key // hash the keyBytes to choose a partition // 直接对key取hashcode值 % 分区总数 =分区编号 这样如果是同一个key,最后肯定是发往同一个分区中 //如果想要让消息发往同一个分区中,必须要指定对应的key return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
2、Kafka分区策略汇总
To Be Continued
分完区后接下来该看下Kafka的RecordAccumulator是如何封装消息
【生产者分析四】RecordAccumulator封装消息流程