一 生产者基本API
创建主题
bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --topic first --replication-factor 2 --partitions 2
生产者代码
1 public class MyProducer { 2 3 public static void main(String[] args) { 4 5 //所有配置的键在ProducerConfig中都有 6 Properties props = new Properties(); 7 8 //kafka 集群,broker-list 9 props.put("bootstrap.servers", "hadoop102:9092"); 10 //应答级别。all等同于-1 11 props.put("acks", "all"); 12 //重试次数 13 props.put("retries", 1); 14 //批次大小和等待时间。16k发送一次,或者1毫秒发送一次(写到缓冲区) 15 props.put("batch.size", 16384); 16 props.put("linger.ms", 1); 17 //RecordAccumulator 缓冲区大小 18 props.put("buffer.memory", 33554432); 19 //KV序列化的类 20 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 21 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 22 23 //生产者 24 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); 25 26 //发送数据 27 for (int i = 0; i < 5; i++) { 28 ProducerRecord<String,String> producerRecord = new ProducerRecord<>("first","atguigu" + i); 29 kafkaProducer.send(producerRecord); 30 } 31 32 //关闭资源。上面发送的5条消息,既没有16k,也不到1毫秒,可能不会发送。关闭才发送送。 33 kafkaProducer.close(); 34 35 } 36 }
控制台消费查看
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu1
atguigu3
atguigu0
atguigu2
atguigu4
二 带回调函数的API
生产者代码
1 public class CallbackProducer { 2 3 public static void main(String[] args) { 4 5 Properties props = new Properties(); 6 7 //只配置这三项,其他用默认配置 8 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); 9 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 10 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 11 12 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); 13 14 for (int i = 0; i < 10; i++) { 15 ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first", "atguigu" + i); 16 17 Callback callback = (recordMetadata, e) -> { 18 if (e == null) { 19 System.out.println("发送成功!分区:" + recordMetadata.partition() + ",偏移量:" + recordMetadata.offset()); 20 }else { 21 e.printStackTrace(); 22 } 23 }; 24 25 kafkaProducer.send(producerRecord, callback); 26 } 27 28 kafkaProducer.close(); 29 } 30 }
控制台打印如下,可以看到第一个批次发往分区0,第二个批次发往分区1,且每个分区都单独有自己的偏移量。由于示例1中发送了5条数据,占用了分区0的偏移位置0和1,分区1的0,1,2。
发送成功!分区:0,偏移量:2 发送成功!分区:0,偏移量:3 发送成功!分区:0,偏移量:4 发送成功!分区:0,偏移量:5 发送成功!分区:0,偏移量:6 发送成功!分区:1,偏移量:3 发送成功!分区:1,偏移量:4 发送成功!分区:1,偏移量:5 发送成功!分区:1,偏移量:6 发送成功!分区:1,偏移量:7