<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.2</version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<!--<exclusion>-->
<!--<groupId>org.apache.zookeeper</groupId>-->
<!--<artifactId>zookeeper</artifactId>-->
<!--</exclusion>-->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
package kafka.dynamic.consumer;
import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class JavaKafkaConsumerHighAPI implements Runnable {
private ConsumerConnector consumer;
private String topic;
private String groupId;
private Integer consumerCount;
private ExecutorService executorPool;
public JavaKafkaConsumerHighAPI(String topic, String groupId, Integer consumerCount) {
this.topic = topic;
this.groupId = groupId;
this.consumerCount = consumerCount;
this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig("127.0.0.1:2181", groupId));
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(this.topic, consumerCount);
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
List<KafkaStream<String, String>> streams = consumerMap.get(this.topic);
this.executorPool = Executors.newFixedThreadPool(consumerCount);
for (final KafkaStream<String, String> stream : streams) {
this.executorPool.submit(new ConsumerKafkaStreamProcesser(stream,topic,groupId));
}
}
public void shutdown() {
if (this.consumer != null) {
this.consumer.shutdown();
}
if (this.executorPool != null) {
this.executorPool.shutdown();
try {
if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly!!");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly!!");
}
}
}
private ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
Properties prop = new Properties();
prop.put("group.id", groupId);
prop.put("zookeeper.connect", zookeeper);
prop.put("zookeeper.session.timeout.ms", "400");
prop.put("zookeeper.sync.time.ms", "200");
prop.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(prop);
}
public static class ConsumerKafkaStreamProcesser implements Runnable {
private KafkaStream<String, String> stream;
private String topic;
private String groupId;
public ConsumerKafkaStreamProcesser(KafkaStream<String, String> stream,String topic,String groupId) {
this.stream = stream;
this.topic = topic;
this.groupId = groupId;
}
@Override
public void run() {
try {
ConsumerIterator<String, String> iter = this.stream.iterator();
while (iter.hasNext()) {
MessageAndMetadata value = iter.next();
System.out.println("____" + value.offset() +"____topic:"+ topic + "____" +",groupId="+groupId +" ,value:"+value.message());
}
System.out.println("sd");
}catch (Exception e){
System.out.println("000"+e.getMessage());
}
System.out.println("sdfsad");
}
}
}
@KafkaListener(groupId = "Main", topics = "test",containerFactory = "kafkaListenerContainerFactory")
public void listener(ConsumerRecord<?, ?> record) throws Exception{
Bar bar;
try {
bar = new ObjectMapper().readValue(record.value().toString(), Bar.class);
log.info("Received notification: {}", notification);
} catch (JsonProcessingException e) {
return;
}
if(map.size() < consumerCount) {
if (!map.containsKey(bar.getName())) {
JavaKafkaConsumerHighAPI example1 = new JavaKafkaConsumerHighAPI(record.topic(), bar.getName(), consumerCount);
new Thread(example1).start();
}
}else {
log.error("消费者配置数量不足!导致数据丢失");
}
}
public static Map<String,JavaKafkaConsumerHighAPI> map = new HashMap<>();
@Value("${app.pusher.consumerCount}")
public Integer consumerCount;