【生产者分析三】Kafka分区器选择【剖析】

1、生产者源码之Producer分区器选择深度剖析

  • 前面我们已经获取到元数据了,接下来我们就可以计算一下,一条消息会发送给topic的哪一个分区中,这里就涉及到底层的分区策略,分区器的选择。

  • 核心代码

    /**
     * todo:  第三步:根据分区器选择消息应该发送的分区
     * 根据元数据信息可以计算出消息应该发送到topic的哪一个分区中
     */
    int partition = partition(record, serializedKey, serializedValue, cluster);
    
  • partition方法分析

     //todo:如果发送消息的过程中,指定了分区编号,直接使用该分区编号就可以了
        //但是正常情况下,消息是没有指定分区编号的
        Integer partition = record.partition();
        return partition != null ?
                partition :
                //todo: 使用分区器进行分区
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }
    
  • 默认分区器DefaultPartitionerpartition方法分析

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
           //todo: 首先获取我们要发送消息对应的topic的分区消息
           List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
           //todo: 计算topic的分区总数
           int numPartitions = partitions.size();
           //todo: 策略一:发送消息的时候 没有指定key
           if (keyBytes == null) {
               //定义一个计数器    每次执行实现加1的操作
               int nextValue = counter.getAndIncrement();
               //获取可用的分区信息
               List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
               if (availablePartitions.size() > 0) {
                   //一个自增的数对分区总数取模,来达到轮训的效果,达到负载均衡
                   //6 % 3=0
                   //7 % 3=1
                   //8 % 3=2
                   //9 % 3=0
                   int part = Utils.toPositive(nextValue) % availablePartitions.size();
                   //根据该值分配分区号
                   return availablePartitions.get(part).partition();
               } else {
                   // no partitions are available, give a non-available partition
                   return Utils.toPositive(nextValue) % numPartitions;
               }
           } else {
               //todo: 策略二:指定key
               // hash the keyBytes to choose a partition
               // 直接对key取hashcode值 % 分区总数 =分区编号  这样如果是同一个key,最后肯定是发往同一个分区中
               //如果想要让消息发往同一个分区中,必须要指定对应的key
               return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
           }
       }
    

2、Kafka分区策略汇总

Kafka全部分区策略【四大类】

To Be Continued

分完区后接下来该看下Kafka的RecordAccumulator是如何封装消息
【生产者分析四】RecordAccumulator封装消息流程

上一篇:0-完全开源的 Vue.js 入门级教程:HelloVue,发车啦!


下一篇:组合模式