大数据基础系列之kafka011生产者缓存超时,幂等性和事务实现

大数据基础系列之kafka011生产者缓存超时,幂等性和事务实现

 浪尖 浪尖聊大数据

一,demo及相关类

1,基本介绍

KafkaProducer是线程安全的,多线程间共享一个实例比共享多个实例更加高效。首先搞一个demo

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic",Integer.toString(i), Integer.toString(i)));

producer.close();

2,ProducerRecord

发往kafka的key/value对。由topic,分区id(可选),key(可选),timestamp(可选),value组成。

如果一个有效的分区ID被指定,Record就会被发到指定的分区。如果,没指定分区id,只指定了key,就会按照key做hash后对分区数取余得到的数值作为分区的id。如果分区id,和key都没有指定,就会以轮训的形式发送Records。

Record还有一个timestamp属性。如果用户没有提供timestamp,生产者将会使用当前时间作为Record的timestamp。Kafka最终使用的时间戳取决于topic配置的时间类型。

1),如果topic配置使用了CreateTime,Broker就会使用生产者生产Record时带的时间戳。

2),如果topic配置使用了LogAppendTime,Record追加到log的时候,Broker会有本地时间代替Producer生产时带的时间戳。

无论是采用的上文中的哪种形式,timestamp都会被包含在RecordMetadata中返回。

ProducerRecord(String topic, Integer partition, K key, V value)
Creates a record to be sent to a specified topic and partition
  ProducerRecord(String topic, Integer partition, K key, V value,Iterable<Header> headers)
Creates a record to be sent to a specified topic and partition
  ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
Creates a record with a specified timestamp to be sent to a specified topic and partition
  ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
Creates a record with a specified timestamp to be sent to a specified topic and partition
  ProducerRecord(String topic, K key, V value)
Create a record to be sent to Kafka
  ProducerRecord(String topic, V value)
Create a record with no key

二,缓存和超时

生产者内部有一个buffer,用来缓存Record,同时内部有一个后台线程负责将Record转化为请求,然后将请求发给kafka集群。使用生产者后未关闭,会导致这些资源泄漏。

send方法是异步的。调用他实际上是将Record添加到Buffer中,然后立即返回。这使得生产者可以批量提交消息来提升性能。

acks配置控制发送请求完成的标准。如果设置成all,将会导致生产阻塞,等待所有副本提交日志成功后才算发送完成,超级低效但是可以最大限度的容错。

如果请求失败,生产者会自动尝试,前提是不要设置retries为零。当然,开启失败尝试也就意味着带来了数据重复发送的风险。

生产者为每个分区维护一个buffer,这个buffer的大小由batch.size指定,该值越大表示批量发送的消息数越多,也意味着需要更大的内存。内存数可以估计的。

默认情况下,即使buffer还有剩余的空间没有填充,消息也会被立即发送。如果你想减少请求的次数,可以设置linger.ms参数为大于0的某一值。使生产者发送消息前等待linger.ms指定的时间,这样就可以有更多的消息加入到该batch来。这很像TCP中的Nagle原理。例如,在上面的代码片段中,由于我们设置linger.ms为1ms,100条消息可能在一次请求中全部发送到了Server端。然而,这也意味着加入消息一直不能填充满buffer,我们要延迟一毫秒。

buffer.memory决定者生产者所能用于buffer的总内存大小。如果,消息发送的速度比传输到Server端的速度快,这个buffer空间就会耗尽。当buffer空间耗尽,send调用就会阻塞,超过max.block.ms设置的超时时间后会抛出TimeoutException。

三,序列化

Key.serializer和value.serialize决定者如何将key和value对象转化为字节数组。你可以使用包括bytearrayserializer或stringserializer简单的字符串或字节类型。也可以实现自定义的序列化方式。

四,幂等性

从kafka0.11版本开始,Kafka支持两种额外的模式:幂等性生产者和事务生产者。幂等性强化消息的传递语义,从至少一次到仅仅一次。特别是生产者重试将不再导致消息重复发送。事务生产者允许应用程序将消息原子的发送到多个分区(和主题!)。

设置enable.idempotence为true来开启幂等性,如果设置了这个参数retries配置将会被设置为默认值,也即Integer.MAX_VALUE,max.inflight.requests.per.connection会被设置为1,acks会被设置为all。幂等性生产者不需要修改API,所以现有的应用程序不需要修改就可以使用该特性。

为了利用幂等生产者,必须避免应用程序级重新发送,因为这些不能被去重。例如,如果应用程序运行幂等性,建议不要设置retries,因为他会被设置为默认值(Integer.MAX_VALUE).此外,如果send(producerrecord)返回一个错误甚至无限重试(例如,如果消息送前缓冲区满了),建议关闭生产和检查最后产生消息的内容以确保不重复。

五,事务

为了使用事务生产者和相关的APIs,必须要设置transactional.id属性.如果设置了transactional.id幂等性会自动被启用。支持事务的topic必须要进行容错配置。特别的replication.factor应该设置为3,topic的min.insync.replicas配置必须设置为2.最后,为了从端到端实现事务性保证,必须配置消费者只读取committed 的消息。

transactional.id目的是单生产者实例能从多会话中恢复。该特性就是分区的,状态的应用程序程序中的一个碎片标识符。transactional.id值在一个分区的应用中每个消费者实例必须是唯一的。

所有新的事务性API都会被阻塞,将在失败时抛出异常。举一个简单的例子,一次事务中提交100条消息。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, newStringSerializer(), new StringSerializer());

producer.initTransactions();

try {
  producer.beginTransaction();
  for (int i = 0; i < 100; i++)
  producer.send(new ProducerRecord<>("my-topic", Integer.toString(i),Integer.toString(i)));
  producer.commitTransaction();
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
  // We can't recover from these exceptions, so our only option is to close the producer and exit.
  producer.close();
catch (KafkaException e) {
  // For all other exceptions, just abort the transaction and try again.
  producer.abortTransaction();
}
producer.close();

就如例子一样,每个消费者只能有一个事务开启。在beginTransaction() 和commitTransaction()中间发送的所有消息,都是一次事务的一部分。

事务生产者使用execeptions进行错误状态交流。特别之处,我们不需要为producer.send指定回调函数。任何在事务中不可恢复的错误发生都会抛出一个KafkaException异常(http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord))。

在接受到一个kafkaexection异常之后,通过调用producer.abortTransaction(),可以保证所有的已经写入成功的消息会被标记为aborted,因此保证事务传输。

六,总结

本文主要是阐述缓存和超时机制,序列化及反序列化,幂等性生产者,事务生产者。大家可以根据需要进行选择.


上一篇:kafka的ack确认机制


下一篇:Python Flask高级编程之从0到1开发《鱼书》精品项目