package com.asiainfo.group.kafka.consumer;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
public class ConsumerDemo {
private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/consumer/";
private static KafkaConsumer<String, String> consumer;
static{
try {
Properties p = new Properties();
p.load(new FileReader(PATH+"consumer.properties"));
consumer = new KafkaConsumer<String,String>(p);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args){
new Thread(new Runnable() {
@Override
public void run() {
List<String> topics = new ArrayList<String>();
topics.add("okk");
consumer.subscribe(topics);
try {
while(true){
ConsumerRecords<String, String> records = consumer.poll(3000);
System.err.println("收到了"+records.count()+"条消息");
for (ConsumerRecord<String, String> record : records) {
System.err.println("topic:"+record.topic());
System.err.println("partition:"+record.partition());
System.err.println("offset:"+record.offset());
System.err.println("key:"+record.key());
System.err.println("value:"+record.value());
}
//同步提交:会一直尝试直至提交成功,会一直阻塞
//consumer.commitSync();
//异步提交:不会重试,原因是因为重试过程中可能有更大的偏移量提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> arg0, Exception arg1) {
System.err.println("异步提交偏移量成功!");
}
});
//同步提交结合异步提交
//如果一切正常,就用异步提交,即使此次提交不成功,下次提交总会成功的
//如果关闭消费者,就没有下一次了,则用同步提交,一直尝试到提交成功为止,类似下面的代码
/*try{
consumer.commitSync();
} finally{
consumer.close();
}*/
}
} catch (Exception e) {
e.printStackTrace();
} finally{
consumer.close();
}
}
}).start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//wakeup方法是唯一能够打断consumer.poll的方法,并使其抛出异常跳出while(true),然后进入finally关闭consumer
consumer.wakeup();
}
}
2、生产者
package com.asiainfo.group.kafka.producer;
import java.io.FileReader;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class ProductDemo {
private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/producer/";
private static KafkaProducer<String, String> producer;
static{
try {
Properties p = new Properties();
p.load(new FileReader(PATH+"producer.properties"));
producer = new KafkaProducer<String,String>(p);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送并忘记(不关心是否到达)
* @throws ExecutionException
* @throws InterruptedException
*/
public void sendAndForget() throws Exception{
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("okk", "key"+i, "sendAndForget"+i);
producer.send(record);
}
producer.close();
}
public void syncSend() throws Exception{
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key"+i, "syncSend"+i);
RecordMetadata recordMetadata = producer.send(record).get();
System.err.println("同步发送成功!");
}
System.err.println("同步发送耗时:"+(System.currentTimeMillis()-start));
producer.close();
}
public void asyncSend(){
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key"+i, "asyncSend"+i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata arg0, Exception arg1) {
System.err.println("异步发送成功!");
}
});
}
System.err.println("异步发送耗时:"+(System.currentTimeMillis()-start));
producer.close();
}
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception{
new ProductDemo().sendAndForget();
//new ProductDemo().syncSend();
//new ProductDemo().asyncSend();
}
}
3、生产者配置文件
bootstrap.servers=192.168.0.108:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
4、消费者配置文件
bootstrap.servers=192.168.0.108:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=groupByJava1
enable.auto.commit=false