partition生成规则
-
不指定key
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { this.throwIfProducerClosed(); long nowMs = this.time.milliseconds(); KafkaProducer.ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs); } catch (KafkaException var22) { if (this.metadata.isClosed()) { throw new KafkaException("Producer closed while send in progress", var22); } throw var22; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException var21) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21); } byte[] serializedValue; try { serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException var20) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20); } int partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); this.setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers); this.ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (this.log.isTraceEnabled()) { this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition}); } Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp); if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.failIfNotReadyForSend(); } RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); if (result.abortForNewBatch) { int prevPartition = partition; this.partitioner.onNewBatch(record.topic(), cluster, partition); partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (this.log.isTraceEnabled()) { this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition}); } interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp); result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.maybeAddPartitionToTransaction(tp); } if (result.batchIsFull || result.newBatchCreated) { this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } catch (ApiException var23) { this.log.debug("Exception occurred during message send:", var23); if (callback != null) { callback.onCompletion((RecordMetadata)null, var23); } this.errors.record(); this.interceptors.onSendError(record, tp, var23); return new KafkaProducer.FutureFailure(var23); } catch (InterruptedException var24) { this.errors.record(); this.interceptors.onSendError(record, tp, var24); throw new InterruptException(var24); } catch (KafkaException var25) { this.errors.record(); this.interceptors.onSendError(record, tp, var25); throw var25; } catch (Exception var26) { this.interceptors.onSendError(record, tp, var26); throw var26; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) { return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } public int partition(String topic, Cluster cluster) { Integer part = (Integer)this.indexCache.get(topic); return part == null ? this.nextPartition(topic, cluster, -1) : part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = (Integer)this.indexCache.get(topic); Integer newPart = oldPart; if (oldPart != null && oldPart != prevPartition) { return (Integer)this.indexCache.get(topic); } else { List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); Integer random; if (availablePartitions.size() < 1) { random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = ((PartitionInfo)availablePartitions.get(0)).partition(); } else { while(newPart == null || newPart.equals(oldPart)) { random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = ((PartitionInfo)availablePartitions.get(random % availablePartitions.size())).partition(); } } if (oldPart == null) { this.indexCache.putIfAbsent(topic, newPart); } else { this.indexCache.replace(topic, prevPartition, newPart); } return (Integer)this.indexCache.get(topic); } }
由上可知,如果没有指定key 会根据topic ,headers
,value 从现有的partition列表中随机选一个,但是一但选定(也就是存在prepartition)就直接用原来的。
-
指定key
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) { return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } public static int murmur2(byte[] data) { int length = data.length; int seed = -1756908916; int m = 1540483477; int r = true; int h = seed ^ length; int length4 = length / 4; for(int i = 0; i < length4; ++i) { int i4 = i * 4; int k = (data[i4 + 0] & 255) + ((data[i4 + 1] & 255) << 8) + ((data[i4 + 2] & 255) << 16) + ((data[i4 + 3] & 255) << 24); k *= 1540483477; k ^= k >>> 24; k *= 1540483477; h *= 1540483477; h ^= k; } switch(length % 4) { case 3: h ^= (data[(length & -4) + 2] & 255) << 16; case 2: h ^= (data[(length & -4) + 1] & 255) << 8; case 1: h ^= data[length & -4] & 255; h *= 1540483477; default: h ^= h >>> 13; h *= 1540483477; h ^= h >>> 15; return h; } } public static int toPositive(int number) { return number & 2147483647; }
-
指定partition,刚直接使用指定的partition,但是要尽量不要把所有的,特别是大量数据的情况放,放入同一个partition,不然可以导致kafka死掉。