文章目录
术语简介
broker :Kafka 集群包含一个或多个服务器,服务器节点称为broker。 【存储topic的数据】
Topic:消息都有一个类别,这个类别被称为Topic 【类似于表名】
partition: topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。
Leader: 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
Follower:Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步
Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障
Consumer Group
使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。
这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行*的分组而不需要多次发送消息到不同的Topic。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。
Consumer Group中各个consumer是根据先后启动的顺序有序消费一个topic的所有partitions的。
如果Consumer Group中所有consumer的总线程数大于partitions数量,则可能consumer thread或consumer会出现空闲状态。
基础命令
一、 创建topic
bin/kafka-topics.sh --create --zookeeper 【zookeeper连接地址】:2181 【zookeeper连接地址】:2181 --replication-factor 3【分区数】 --partitions 3【备份数】 --topic topic的名字
二、 查看zookeeper的连接地址
grep -Ev ‘$|#’ config/server.properties
三、 查看所有的topic
bin/kafka-topics.sh --zookeeper 【zookeeper连接地址】:2181 --list
四 查看某一个topic的详细信息
bin/kafka-topics.sh --topic 【topic的名字】 --describe --zookeeper 【zookeeper连接地址】:2181
五 发送消息(生产一个消息)
bin/kafka-console-producer.sh --broker-list 【zookeeper连接地址】:9092 --topic 【目标topic】
六 启动消费者(消费消息)
bin/kafka-console-consumer.sh --bootstrap-server 【zookeeper连接地址】:9092 --topic 【目标topic】 --from-beginning
java使用
导包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置文件
#入库数据topic
data.collection.kafka.topic = data_collection_test_dev topic名
data-collection.kafka.group = data_collection_group_test_dev 组名
data-collection.max-partition-fetch-bytes = 104857600 拉取最大的容量
data-collection.max-poll-records = 100000 拉取最大的条数
data-collection.fetch-max-bytes = 104857600 fetch拉取最大的容量
data-collection.max-poll-interval-ms = 600000 默认时间5分钟,需手动调大
#入库kafka批量消费配置
spring.kafka.listener.type = batch
spring.kafka.listener.concurrency = 3
配置类
@Configuration
@EnableKafka
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Value("${data-collection.kafka.group}")
private String consumerGroup;
/**
* partition拉取最大的容量
*/
@Value("${data-collection.max-partition-fetch-bytes:104857600}")
private int maxPartitionFetchBytes;
/**
* 拉取最大的条数
*/
@Value("${data-collection.max-poll-records:100000}")
private int maxPollRecords;
/**
* fetch拉取最大的容量
*/
@Value("${data-collection.fetch-max-bytes:104857600}")
private int maxFetchBytes;
/**
*
*/
@Value("${data-collection.max-poll-interval-ms:600000}")
private int maxPollIntervalMS;
private static String SESSION_TIMEOUT_MS = "session.timeout.ms";
private static String KEBEROS_ENABLE = "keberos-enable";
private static String SASL_MECHANISM = "sasl.mechanism";
private static String SECURITY_PROTOCOL = "security.protocol";
private static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
/**
* 配置批量消费
*
* @return
*/
@Bean
public KafkaListenerContainerFactory batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
factory.setBatchListener(true);
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
return factory;
}
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = kafkaProperties.getConsumer().buildProperties();
Map<String, String> properties = kafkaProperties.getProperties();
if (!properties.isEmpty()) {
properties.forEach(props::put);
}
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, maxFetchBytes);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMS);
if (properties.containsKey(ProducerConfig.MAX_REQUEST_SIZE_CONFIG)) {
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, properties.get(ProducerConfig.MAX_REQUEST_SIZE_CONFIG));
}
if (properties.containsKey(KEBEROS_ENABLE) && Boolean.valueOf(properties.containsKey(KEBEROS_ENABLE)).equals(Boolean.TRUE)) {
props.put(SASL_MECHANISM, properties.get(SASL_MECHANISM));
props.put(SECURITY_PROTOCOL, properties.get(SECURITY_PROTOCOL));
props.put(SASL_KERBEROS_SERVICE_NAME, properties.get(SASL_KERBEROS_SERVICE_NAME));
}
return props;
}
}
生产消息
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${data.collection.kafka.topic}")
private String kafkaTopic;
//生产数据 topic名 数据(JSON格式)
kafkaTemplate.send(kafkaTopic, jsonObject.toJSONString());
消费消息
@KafkaListener(topics = {"${data.collection.mail.info.kafka.topic}"}, containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, String>> records) {
。。。实现逻辑
}