kafka java api

1、消费者
package com.asiainfo.group.kafka.consumer;

import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;

public class ConsumerDemo {
	private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/consumer/";
	private static KafkaConsumer<String, String> consumer;
	static{
		try {
			Properties p = new Properties();
			p.load(new FileReader(PATH+"consumer.properties"));
			consumer = new KafkaConsumer<String,String>(p);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
	
	
	public static void main(String[] args){
		
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				List<String> topics = new ArrayList<String>();
				topics.add("okk");
				consumer.subscribe(topics);
				
				
				try {
					while(true){
						ConsumerRecords<String, String> records = consumer.poll(3000);
						System.err.println("收到了"+records.count()+"条消息");
						for (ConsumerRecord<String, String> record : records) {
							System.err.println("topic:"+record.topic());
							System.err.println("partition:"+record.partition());
							System.err.println("offset:"+record.offset());
							System.err.println("key:"+record.key());
							System.err.println("value:"+record.value());
						}
						//同步提交:会一直尝试直至提交成功,会一直阻塞
						//consumer.commitSync();
						
						//异步提交:不会重试,原因是因为重试过程中可能有更大的偏移量提交
						consumer.commitAsync(new OffsetCommitCallback() {
							@Override
							public void onComplete(Map<TopicPartition, OffsetAndMetadata> arg0, Exception arg1) {
								System.err.println("异步提交偏移量成功!");
							}
						});
						
						//同步提交结合异步提交
						//如果一切正常,就用异步提交,即使此次提交不成功,下次提交总会成功的
						//如果关闭消费者,就没有下一次了,则用同步提交,一直尝试到提交成功为止,类似下面的代码
						/*try{
							consumer.commitSync();
						} finally{
							consumer.close();
						}*/
					}
				} catch (Exception e) {
					e.printStackTrace();
				} finally{
					consumer.close();
				}
			}
		}).start();
		
		try {
			Thread.sleep(10000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}		
		
		//wakeup方法是唯一能够打断consumer.poll的方法,并使其抛出异常跳出while(true),然后进入finally关闭consumer
		consumer.wakeup();
	}

}

 

2、生产者
package com.asiainfo.group.kafka.producer;

import java.io.FileReader;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProductDemo {
	private static final String PATH="D:/development-software/eclipse-mars/default/java_test/src/main/java/com/asiainfo/group/kafka/producer/";
	private static KafkaProducer<String, String> producer;
	static{
		try {
			Properties p = new Properties();
			p.load(new FileReader(PATH+"producer.properties"));
			producer = new KafkaProducer<String,String>(p);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
	
	
	/**
	 * 发送并忘记(不关心是否到达)
	 * @throws ExecutionException 
	 * @throws InterruptedException 
	 */
	public void sendAndForget() throws Exception{
		for (int i = 0; i < 10; i++) {
			ProducerRecord<String, String> record = new ProducerRecord<>("okk", "key"+i, "sendAndForget"+i);
			producer.send(record);
		}
		producer.close();
	}
	
	
	public void syncSend() throws Exception{
		long start = System.currentTimeMillis();
		for (int i = 0; i < 1000; i++) {
			ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key"+i, "syncSend"+i);
			RecordMetadata recordMetadata = producer.send(record).get();
			System.err.println("同步发送成功!");
		}
		System.err.println("同步发送耗时:"+(System.currentTimeMillis()-start));
		producer.close();
		
	}
	
	public void asyncSend(){
		long start = System.currentTimeMillis();
		for (int i = 0; i < 10; i++) {
			ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key"+i, "asyncSend"+i);
			producer.send(record, new Callback() {
				@Override
				public void onCompletion(RecordMetadata arg0, Exception arg1) {
					System.err.println("异步发送成功!");
				}
			});
			
		}
		System.err.println("异步发送耗时:"+(System.currentTimeMillis()-start));
		producer.close();
	}
	
	@SuppressWarnings("resource")
	public static void main(String[] args) throws Exception{
		new ProductDemo().sendAndForget();
		//new ProductDemo().syncSend();
		//new ProductDemo().asyncSend();
		
		
				
	}

}
3、生产者配置文件
bootstrap.servers=192.168.0.108:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
4、消费者配置文件
bootstrap.servers=192.168.0.108:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=groupByJava1
enable.auto.commit=false

 

上一篇:kafka多个consumer同时消费一个topic数据


下一篇:函数式接口consumer