文章目录
Kafka生产者默认配置
参数名 | 默认值 | 参数说明 |
---|---|---|
retries | 0 | 设置大于零的值将导致客户端重新发送任何发送失败并可能出现暂时性错误的记录。 |
acks | 1 | 在认为请求完成之前,生产者要求领导者收到的确认数。acks=0 ,如果设置为零,那么生产者根本不会等待来自服务器的任何确认。acks=1 ,只要分区的Leader副本写入成功就有会返回。acks=-1 或acks=all ,需要所有副本都写入完毕。 |
compression.type | none | 生产者进行消息压缩配置,none 不压缩,值gzip ,snappy ,lz4 。压缩是对整批数据进行的,所以batching的效率也会影响压缩率(更多batching意味着更好的压缩)。 |
batch.size | 16384(16KB) | BatchRecords的大小 |
linger.ms | 0 | 在达不到批次消息大小的时候 可以调整等待周期,单位ms |
client.id | “ ” | 客户端id |
send.buffer.bytes | 128 * 1024 (128KB) | 发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。 如果值为 -1,则将使用操作系统默认值。单位B |
receive.buffer.bytes | 32 * 1024 (32KB) | 读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。 如果值为 -1,则将使用操作系统默认值。单位B |
max.request.size | 1 * 1024 * 1024 (1MB) | 请求的最大大小(以字节为单位)。 此设置将限制生产者将在单个请求中发送的记录批次数,以避免发送大量请求。 这实际上也是最大记录批量大小的上限。 请注意,服务器有自己的记录批量大小上限,可能与此不同。单位B |
reconnect.backoff.ms | 50L | 在尝试重新连接到给定主机之前等待的基本时间。 这避免了在紧密循环中重复连接到主机。 此退避适用于客户端到代理的所有连接尝试。单位ms |
reconnect.backoff.max.ms | 1000L | 重新连接到反复连接失败的代理时要等待的最长时间(以毫秒为单位)。 如果提供,则每个主机的退避将在每次连续连接失败时呈指数增加,直至达到此最大值。 计算回退增量后,添加 20% 的随机抖动以避免连接风暴。单位ms |
retry.backoff.ms | 100L | 在尝试重试对给定主题分区的失败请求之前等待的时间。 这避免了在某些故障情况下在紧密循环中重复发送请求。单位ms |
max.block.ms | 60s | 配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。 由于缓冲区已满或元数据不可用,这些方法可能会被阻止。 用户提供的序列化程序或分区程序中的阻塞不会计入此超时。单位ms |
request.timeout.ms | 30s | 这应该大于replica.lag.time.max.ms (代理配置)以减少由于不必要的生产者重试而导致消息重复的可能性。 |
metadata.max.age.ms | 30s | 即使我们没有看到任何分区领导更改以主动发现任何新代理或分区,在这个时间内我们也会强制刷新元数据,单位ms。 |
metrics.sample.window.ms | 30s | 计算指标样本的时间窗口。 |
metrics.num.samples | 2 | 为计算指标而维护的样本数。 |
metrics.log.level | “0” | 指标输入日志级别,“0”:INFO,”1“:DEBUG |
metric.reporters | Collections.emptyList() | 用作指标报告器的类列表。 实现 org.apache.kafka.common.metrics.MetricsReporter 接口允许插入将在新指标创建时通知的类。 JmxReporter 总是包含在注册 JMX 统计信息中。 |
max.in.flight.requests.per.connection | 5 | 客户端在阻塞前将在单个连接上发送的最大未确认请求数。 请注意,如果此设置设置为大于 1 且发送失败,则存在由于重试(即启用重试)而导致消息重新排序的风险。 |
connections.max.idle.ms | 9 * 60 * 1000 | 在此配置指定的毫秒数后关闭空闲连接。默认值9分钟,单位ms。 |
partitioner.class | DefaultPartitioner.class | 分区器。实现 org.apache.kafka.clients.producer.Partitioner 接口的 Partitioner 类。 |
interceptor.classes | Collections.emptyList() | 用作拦截器的类列表。 实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口允许您在将生产者收到的记录发布到 Kafka 集群之前拦截(并可能改变)它们。 默认情况下,没有拦截器。 |
security.protocol | PLAINTEXT | 用于与代理通信的协议。 有效值为:Utils.join(SecurityProtocol.names(), ", ") 。 |
enable.idempotence | false | 幂等是否开启,当设置为“true”时,生产者将确保在流中写入每条消息的一个副本。 如果为“false”,则生产者由于代理失败等而重试,可能会在流中写入重试消息的副本。 请注意,启用幂等性要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于或等于 5, RETRIES_CONFIG 大于 0 且ACKS_CONFIG 必须为all 。 如果用户未明确设置这些值,则会选择合适的值。 如果设置了不兼容的值,则会抛出 ConfigException。 |
transaction.timeout.ms | 60s | 事务超时时间,事务协调器在主动中止正在进行的事务之前等待来自生产者的事务状态更新的最长时间(以毫秒为单位)。 如果此值大于代理中的 transaction.max.timeout.ms 设置,则请求将失败并显示lnvalidTransactionTimeout 错误。 |
transactional.id | null | 事务id,用于事务性交付的 TransactionalId。 这实现了跨越多个生产者会话的可靠性语义,因为它允许客户端保证使用相同 TransactionalId 的事务在开始任何新事务之前已经完成。 如果未提供 TransactionalId,则生产者仅限于幂等交付。 请注意,如果配置了 TransactionalId,则必须启用 enable.idempotence 。 默认为 null ,表示不能使用事务。 请注意,默认情况下,事务需要至少三个代理的集群,这是生产的推荐设置; 对于开发,您可以通过调整代理设置transaction.state.log.replication.factor 来改变这一点。 |
在ProducerConfig
类中static{}块。
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Type.STRING,
"1",
in("all", "-1", "0", "1"),
Importance.HIGH,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(MAX_REQUEST_SIZE_CONFIG,
Type.INT,
1 * 1024 * 1024,
atLeast(0),
Importance.MEDIUM,
MAX_REQUEST_SIZE_DOC)
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(RECONNECT_BACKOFF_MAX_MS_CONFIG, Type.LONG, 1000L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(MAX_BLOCK_MS_CONFIG,
Type.LONG,
60 * 1000,
atLeast(0),
Importance.MEDIUM,
MAX_BLOCK_MS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
30 * 1000,
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
.define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
.define(METRICS_RECORDING_LEVEL_CONFIG,
Type.STRING,
Sensor.RecordingLevel.INFO.toString(),
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
Importance.LOW,
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
Type.INT,
5,
atLeast(1),
Importance.LOW,
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
9 * 60 * 1000,
Importance.MEDIUM,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(PARTITIONER_CLASS_CONFIG,
Type.CLASS,
DefaultPartitioner.class,
Importance.MEDIUM, PARTITIONER_CLASS_DOC)
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
INTERCEPTOR_CLASSES_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport()
.define(ENABLE_IDEMPOTENCE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
ENABLE_IDEMPOTENCE_DOC)
.define(TRANSACTION_TIMEOUT_CONFIG,
Type.INT,
60000,
Importance.LOW,
TRANSACTION_TIMEOUT_DOC)
.define(TRANSACTIONAL_ID_CONFIG,
Type.STRING,
null,
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC);
}
其中还有关于SSL和SASL的配置在.withClientSslSupport()
和.withClientSaslSupport()
中,涉及类SslConfigs
和SaslConfigs
。
Kafka消费者默认配置
参数名 | 默认值 | 参数说明 |
---|---|---|
group.id | " " | 标识此消费者所属的消费者组的唯一字符串。 如果消费者通过使用 subscribe(topic) 或基于 Kafka 的偏移管理策略使用组管理功能,则需要此属性。 |
session.timeout.ms | 10000 | 使用 Kafka 的组管理工具时用于检测消费者故障的超时时间。 消费者定期发送心跳以向代理表明其活跃度。 如果在此会话超时到期之前代理没有收到心跳,则代理将从组中删除此消费者并启动重新平衡。 请注意,该值必须在 group.min.session.timeout.ms 和 group.max.session.timeout.ms 在代理配置中配置的允许范围内 . |
heartbeat.interval.ms | 3000 | 使用 Kafka 的组管理工具时,消费者协调器的心跳之间的预期时间。 心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。 该值必须设置为低于 session.timeout.ms ,但通常不应设置为高于该值的 1/3。 它可以调整得更低,以控制正常重新平衡的预期时间。 |
partition.assignment.strategy | Collections.singletonList(RangeAssignor.class) | 消费者分配分区策略。 |
metadata.max.age.ms | 5 * 60 * 1000 | 即使我们没有看到任何分区领导更改以主动发现任何新代理或分区,在这个时间内我们也会强制刷新元数据,单位ms。 |
enable.auto.commit | true | |
auto.commit.interval.ms | 5000 | |
client.id | " " | |
max.partition.fetch.bytes | 1 * 1024 * 1024 | |
send.buffer.bytes | 128 * 1024 | |
receive.buffer.bytes | 64 * 1024 | |
fetch.min.bytes | 1 | |
fetch.max.bytes | 50 * 1024 * 1024 | |
fetch.max.wait.ms | 500 | |
reconnect.backoff.ms | 50L | |
reconnect.backoff.max.ms | 1000L | |
retry.backoff.ms | 100L | |
auto.offset.reset | “latest” | “latest”, “earliest”, “none”。 |
check.crcs | true | |
metrics.sample.window.ms | 30000 | |
metrics.num.samples | 2 | |
metrics.log.level | Sensor.RecordingLevel.INFO.toString() | |
metric.reporters | Collections.emptyList() | |
request.timeout.ms | 305000 | chosen to be higher than the default of max.poll.interval.ms
|
connections.max.idle.ms | 9 * 60 * 1000 | |
interceptor.classes | Collections.emptyList() | 用作拦截器的类列表。 实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口允许您在将生产者收到的记录发布到 Kafka 集群之前拦截(并可能改变)它们。 默认情况下,没有拦截器。 |
max.poll.records | 500 | |
max.poll.interval.ms | 300000 | |
exclude.internal.topics | true | |
internal.leave.group.on.close | true | |
isolation.level | “0” | |
security.protocol | CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL | |
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT,
10000,
Importance.HIGH,
SESSION_TIMEOUT_MS_DOC)
.define(HEARTBEAT_INTERVAL_MS_CONFIG,
Type.INT,
3000,
Importance.HIGH,
HEARTBEAT_INTERVAL_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
Collections.singletonList(RangeAssignor.class),
new ConfigDef.NonNullValidator(),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
.define(METADATA_MAX_AGE_CONFIG,
Type.LONG,
5 * 60 * 1000,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_MAX_AGE_DOC)
.define(ENABLE_AUTO_COMMIT_CONFIG,
Type.BOOLEAN,
true,
Importance.MEDIUM,
ENABLE_AUTO_COMMIT_DOC)
.define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
Type.INT,
5000,
atLeast(0),
Importance.LOW,
AUTO_COMMIT_INTERVAL_MS_DOC)
.define(CLIENT_ID_CONFIG,
Type.STRING,
"",
Importance.LOW,
CommonClientConfigs.CLIENT_ID_DOC)
.define(MAX_PARTITION_FETCH_BYTES_CONFIG,
Type.INT,
DEFAULT_MAX_PARTITION_FETCH_BYTES,
atLeast(0),
Importance.HIGH,
MAX_PARTITION_FETCH_BYTES_DOC)
.define(SEND_BUFFER_CONFIG,
Type.INT,
128 * 1024,
atLeast(-1),
Importance.MEDIUM,
CommonClientConfigs.SEND_BUFFER_DOC)
.define(RECEIVE_BUFFER_CONFIG,
Type.INT,
64 * 1024,
atLeast(-1),
Importance.MEDIUM,
CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(FETCH_MIN_BYTES_CONFIG,
Type.INT,
1,
atLeast(0),
Importance.HIGH,
FETCH_MIN_BYTES_DOC)
.define(FETCH_MAX_BYTES_CONFIG,
Type.INT,
DEFAULT_FETCH_MAX_BYTES,
atLeast(0),
Importance.MEDIUM,
FETCH_MAX_BYTES_DOC)
.define(FETCH_MAX_WAIT_MS_CONFIG,
Type.INT,
500,
atLeast(0),
Importance.LOW,
FETCH_MAX_WAIT_MS_DOC)
.define(RECONNECT_BACKOFF_MS_CONFIG,
Type.LONG,
50L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
Type.LONG,
1000L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
.define(RETRY_BACKOFF_MS_CONFIG,
Type.LONG,
100L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(AUTO_OFFSET_RESET_CONFIG,
Type.STRING,
"latest",
in("latest", "earliest", "none"),
Importance.MEDIUM,
AUTO_OFFSET_RESET_DOC)
.define(CHECK_CRCS_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
CHECK_CRCS_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG,
Type.INT,
2,
atLeast(1),
Importance.LOW,
CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
.define(METRICS_RECORDING_LEVEL_CONFIG,
Type.STRING,
Sensor.RecordingLevel.INFO.toString(),
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
Importance.LOW,
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
KEY_DESERIALIZER_CLASS_DOC)
.define(VALUE_DESERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
VALUE_DESERIALIZER_CLASS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
305000, // chosen to be higher than the default of max.poll.interval.ms
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
9 * 60 * 1000,
Importance.MEDIUM,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
INTERCEPTOR_CLASSES_DOC)
.define(MAX_POLL_RECORDS_CONFIG,
Type.INT,
500,
atLeast(1),
Importance.MEDIUM,
MAX_POLL_RECORDS_DOC)
.define(MAX_POLL_INTERVAL_MS_CONFIG,
Type.INT,
300000,
atLeast(1),
Importance.MEDIUM,
MAX_POLL_INTERVAL_MS_DOC)
.define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
Type.BOOLEAN,
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
Importance.MEDIUM,
EXCLUDE_INTERNAL_TOPICS_DOC)
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW)
.define(ISOLATION_LEVEL_CONFIG,
Type.STRING,
DEFAULT_ISOLATION_LEVEL,
in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
Importance.MEDIUM,
ISOLATION_LEVEL_DOC)
// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport();
}
其中还有关于SSL和SASL的配置在.withClientSslSupport()
和.withClientSaslSupport()
中,涉及类SslConfigs
和SaslConfigs
。