API

API

maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

Producer API

public class ProducerTest {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            // Callback可选
            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e) {
                       e.printStackTrace();
                    } else {
                        System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}

bootstrap.servers 用于创建向kafka broker服务器的连接。集群是通过配置bootstrap.servers指定一个或多个broker。不用指定全部的broker,它将自动发现集群中的其余的borker(最好指定多个,万一有服务器故障)。

acks 指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。配置项控制的是完成的标准,即什么样的请求被认为是完成了的。

如果acks=0 , 生产者在成功写入悄息之前不会等待任何来自服务器的响应。

如果acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。

如果acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

batch.size 生产者对每个分区都维护了一个buffers,其中放的是未被发送的记录。 这些buffers的大小是通过batch.size配置项来控制的。 batch.size是调优producer吞吐量和延时性能指标都有非常重要作用。 默认值16384即16KB。

linger.ms 该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。该参数默认值是0。表示消息需要被立即发送,无须关系batch是否被填满。

buffer.memory 指定了producer端用于缓存消息的缓冲区的大小,单位是字节,默认值是33554432即32M。

key.serializer 被发送到broker端的任何消息的格式都必须是字节数组。 因此消息的各个组件都必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化只用的。

value.serializer 和key.serializer类似。此被用来对消息体即消息value部分做序列化。 将消息value部分转换成字节数组。

几种发送方式

Kafaka有三种消息发送方式。

发送并忘记(fire-and-forget)

把消息发送给服务器,并不关心它是否正常到达。大多数情况下,消息会正常到达,因为Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。

ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
try{
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}

同步发送

使用send()发送消息,它会返回Future对象,调用get()方法进行等待,就可以知道消息息是否发送成功。

ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
try{
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}

调用get()方法等待Kafka响应,如果服务器返回错误,get()方法会抛出异常。如果没有发生错误,会得到一个RecordMetadata对象。

异步发送

调用send()方法,并指定一个回调函数, 服务器在返回响应时调用该函数。

ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        ......
    }
});

Consumer API

Offsets and Consumer Position

kafka为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的唯一标示符。也表示消费者在分区的位置。例如,一个位置是5的消费者(说明已经消费了0到4的消息),下一个接收消息的偏移量为5的消息。实际上有两个与消费者相关的“位置”概念:

消费者的位置给出了下一条记录的偏移量。它比消费者在该分区中看到的最大偏移量要大一个。 它在每次消费者在调用poll(long)中接收消息时自动增长。

那么消费者是如何提交偏移量的呢?消费者往一个叫作_consumer_offset的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果悄费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。消费者可以选择定期自动提交偏移量,也可以选择通过调用commit API来手动的控制(如:commitSync 和 commitAsync)。这个区别是消费者来控制一条消息什么时候才被认为是已被消费的,控制权在消费者。

Automatic Offset Committing

public class ComsumerTest {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        // 消费者组
        props.put("group.id", "test_group");
        // 设置enable.auto.commit,偏移量由auto.commit.interval.ms控制自动提交的频率
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅topic。调用kafkaConsumer.subscribe方法订阅consumer group所需的topic列表
        consumer.subscribe(Arrays.asList("test"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        } finally {
            consumer.close();
        }
    }
}

设置enable.auto.commit意味着自动提交已消费的记录的offset。

这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询向Kafka请求数据。消费者必须持续对Kafka进行轮询,否则会被认为己经死亡,它的分区会被移交给群组里的其他消费者。传给poll()方法的参数是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为0,poll()会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据。

在退出应用程序之前使用close()方法关闭消费者。网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳井认定它已死亡,因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。

Manual Offset Control

public class ComsumerTestManualOffset {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test_goup");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    buffer.add(record);
                }
                if (buffer.size() >= minBatchSize) {
                    insertIntoDb(buffer);
                    consumer.commitAsync();
                    buffer.clear();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
        
    }
}

代替消费者周期性的提交已消费的offsets,用户可以控制什么时候记录被认为是已经消费并提交它们的offsets。 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。

这个例子使用了同步和异步组合提交。般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。

直接关闭消费者,就没有所谓的“下一次提交”了。使用commitSync()方法会一直重试,直到提交成功或发生无法恢复的错误。

提交特定的偏移量

再均衡监听器

API

上一篇:BAPI_GOODSMVT_CREATE(调拨 收货 发货 入库 退货)


下一篇:WARN node unsupported "node@v6.11.2" is ......(windows系统更新node版本)