头歌educoder-kafka入门篇
第一关kafka-初体验
#!/bin/bash
#1.创建一个名为demo的Topic
kafka-topics.sh -create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic demo
#2.查看所有Topic
kafka-topics.sh --list --zookeeper 127.0.0.1:2181
#3.查看名为demo的Topic的详情信息
kafka-topics.sh -topic demo --describe --zookeeper 127.0.0.1:2181
第二关:生产者-简单模式
package net.educoder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* kafka producer 简单模式
*/
public class App {
public static void main(String[] args) {
/**
* 1.创建配置文件对象,一般采用 Properties
*/
/**----------------begin-----------------------*/
Properties props = new Properties();
/**-----------------end-------------------------*/
/**
* 2.设置kafka的一些参数
* bootstrap.servers --> kafka的连接地址 127.0.0.1:9092
* key、value的序列化类 -->org.apache.kafka.common.serialization.StringSerializer
* acks:1,-1,0
*/
/**-----------------begin-----------------------*/
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/**-----------------end-------------------------*/
/**
* 3.构建kafkaProducer对象
*/
/**-----------------begin-----------------------*/
Producer<String, String> producer = new KafkaProducer<>(props);
/**-----------------end-------------------------*/
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("demo", i + "", i + "");
/**
* 4.发送消息
*/
/**-----------------begin-----------------------*/
producer.send(record);
/**-----------------end-------------------------*/
}
producer.close();
}
}
第三关:消费者-自动提交偏移量
package net.educoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class App {
public static void main(String[] args) {
Properties props = new Properties();
/**--------------begin----------------*/
//1.设置kafka集群的地址
props.put("bootstrap.servers", "127.0.0.1:9092");
//2.设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "g1");
//3.开启offset自动提交
props.put("enable.auto.commit", "true");
//4.自动提交时间间隔
props.put("auto.commit.interval.ms", "1000");
//5.序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/**---------------end---------------*/
/**--------------begin----------------*/
//6.创建kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//7.订阅kafka的topic
consumer.subscribe(Arrays.asList("demo"));
/**---------------end---------------*/
int i = 1;
while (true) {
/**----------------------begin--------------------------------*/
//8.poll消息数据,返回的变量为crs
ConsumerRecords<String, String> crs = consumer.poll(100);
for (ConsumerRecord<String, String> cr : crs) {
System.out.println("consume data:" + i);
i++;
}
/**----------------------end--------------------------------*/
if (i > 10) {
return;
}
}
}
}
第四关:消费者-手动提交偏移量
package net.educoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class App {
public static void main(String[] args){
Properties props = new Properties();
/**-----------------begin------------------------*/
//1.设置kafka集群的地址
props.put("bootstrap.servers", "127.0.0.1:9092");
//2.设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "g1");
//3.关闭offset自动提交
props.put("enable.auto.commit", "false");
//4.序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/**-----------------end------------------------*/
/**-----------------begin------------------------*/
//5.实例化一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//6.消费者订阅主题,订阅名为demo的主题
consumer.subscribe(Arrays.asList("demo"));
/**-----------------end------------------------*/
final int minBatchSize = 10;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
for (ConsumerRecord bf : buffer) {
System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());
}
/**-----------------begin------------------------*/
//7.手动提交偏移量
consumer.commitSync();
/**-----------------end------------------------*/
buffer.clear();
return;
}
}
}
}