1、POM文件导入
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.1</version> </dependency>
2、yml配置文件这样写
kafka: consumer: enable-auto-commit: true group-id: kafkaProducer auto-commit-interval: 1000 auto-offset-reset: latest bootstrap-servers: ip:prot key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer concurrency: 3
3、kafka配置类
package com.harzone.kafka; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.Properties; @Component public class KafkaConsumerConfig implements ApplicationRunner { @Value("${kafka.consumer.bootstrap-servers}") private String servers; @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${kafka.consumer.group-id}") private String groupId; @Value("${kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; private final String topic = "test_producer"; Properties props; private KafkaConsumer<String, String> kafkaConsumer; private Properties initProperties() { // zookeeper 配置 props = new Properties(); props.put("bootstrap.servers", servers); // group 代表一个消费组 props.put("group.id", "kafkaProducer"); props.put("session.timeout.ms", "30000"); // 往zookeeper上写offset的频率 props.put("auto.commit.interval.ms", "1000"); // key的反序列化类型 props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); return props; } @Override public void run(ApplicationArguments args) throws Exception { //初始化kafka链接 initProperties(); //Runtime.getRuntime().availableProcessors()线程数量 KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, topic, Runtime.getRuntime().availableProcessors() - 1); consumerThread.start(); } } 4、kafka多线程拉取实现
package com.harzone.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class KafkaConsumerThread extends Thread { private static final Logger log = LoggerFactory.getLogger(KafkaConsumerThread.class); private KafkaConsumer<String, String> kafkaConsumer; //线程池 private ExecutorService executorService; //线程数 private int threadNumber; static List<String> list = new ArrayList<>(); //有参构造函数,初始化数据配置 public KafkaConsumerThread(Properties properties, String topic, int availableProcessors) { kafkaConsumer = new KafkaConsumer<String, String>(properties); //subscribe从最新处消费(assign从最后处消费)----singletonList:返回一个不可变的列表 kafkaConsumer.subscribe(Collections.singletonList(topic)); this.threadNumber = availableProcessors; //ThreadPoolExecutor线程池(核心线程池大小,最大线程数,线程最大空闲时间,时间单位,线程等待队列,拒绝策略) executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); } @Override public void run() { try { while (true) { //poll(多长时间拉取一次ms) ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10)); if (!records.isEmpty()) { executorService.submit(new RecordsHandler(records)); } } } catch (Exception e) { e.printStackTrace(); } finally { kafkaConsumer.close(); } } }