kafka使用

文章目录

术语简介

  1. broker :Kafka 集群包含一个或多个服务器,服务器节点称为broker。 【存储topic的数据】

  2. Topic:消息都有一个类别,这个类别被称为Topic 【类似于表名】

  3. partition: topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。

  4. Leader: 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

  5. Follower:Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步

  6. Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障

kafka使用

Consumer Group

使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。

这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行*的分组而不需要多次发送消息到不同的Topic。

实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。

Consumer Group中各个consumer是根据先后启动的顺序有序消费一个topic的所有partitions的。

如果Consumer Group中所有consumer的总线程数大于partitions数量,则可能consumer thread或consumer会出现空闲状态

基础命令

一、 创建topic

bin/kafka-topics.sh --create --zookeeper 【zookeeper连接地址】:2181 【zookeeper连接地址】:2181 --replication-factor 3【分区数】 --partitions 3【备份数】 --topic topic的名字

二、 查看zookeeper的连接地址

grep -Ev ‘$|#’ config/server.properties

三、 查看所有的topic

bin/kafka-topics.sh --zookeeper 【zookeeper连接地址】:2181 --list

四 查看某一个topic的详细信息

bin/kafka-topics.sh --topic 【topic的名字】 --describe --zookeeper 【zookeeper连接地址】:2181

五 发送消息(生产一个消息)

bin/kafka-console-producer.sh --broker-list 【zookeeper连接地址】:9092 --topic 【目标topic】

六 启动消费者(消费消息)

bin/kafka-console-consumer.sh --bootstrap-server 【zookeeper连接地址】:9092 --topic 【目标topic】 --from-beginning

java使用

导包

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件

#入库数据topic
data.collection.kafka.topic = data_collection_test_dev       topic名
data-collection.kafka.group = data_collection_group_test_dev    组名
data-collection.max-partition-fetch-bytes = 104857600  拉取最大的容量
data-collection.max-poll-records = 100000 拉取最大的条数
data-collection.fetch-max-bytes = 104857600 fetch拉取最大的容量
data-collection.max-poll-interval-ms = 600000  默认时间5分钟,需手动调大


#入库kafka批量消费配置
spring.kafka.listener.type = batch
spring.kafka.listener.concurrency = 3

配置类

@Configuration
@EnableKafka
public class KafkaConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Value("${data-collection.kafka.group}")
    private String consumerGroup;

    /**
     * partition拉取最大的容量
     */
    @Value("${data-collection.max-partition-fetch-bytes:104857600}")
    private int maxPartitionFetchBytes;

    /**
     * 拉取最大的条数
     */
    @Value("${data-collection.max-poll-records:100000}")
    private int maxPollRecords;

    /**
     * fetch拉取最大的容量
     */
    @Value("${data-collection.fetch-max-bytes:104857600}")
    private int maxFetchBytes;

    /**
     *
     */
    @Value("${data-collection.max-poll-interval-ms:600000}")
    private int maxPollIntervalMS;

    private static String SESSION_TIMEOUT_MS = "session.timeout.ms";
    private static String KEBEROS_ENABLE = "keberos-enable";
    private static String SASL_MECHANISM = "sasl.mechanism";
    private static String SECURITY_PROTOCOL = "security.protocol";
    private static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";


    /**
     * 配置批量消费
     *
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
        factory.setBatchListener(true);
        factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfig() {
        Map<String, Object> props = kafkaProperties.getConsumer().buildProperties();
        Map<String, String> properties = kafkaProperties.getProperties();
        if (!properties.isEmpty()) {
            properties.forEach(props::put);
        }
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, maxFetchBytes);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMS);
        if (properties.containsKey(ProducerConfig.MAX_REQUEST_SIZE_CONFIG)) {
            props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, properties.get(ProducerConfig.MAX_REQUEST_SIZE_CONFIG));
        }
        if (properties.containsKey(KEBEROS_ENABLE) && Boolean.valueOf(properties.containsKey(KEBEROS_ENABLE)).equals(Boolean.TRUE)) {
            props.put(SASL_MECHANISM, properties.get(SASL_MECHANISM));
            props.put(SECURITY_PROTOCOL, properties.get(SECURITY_PROTOCOL));
            props.put(SASL_KERBEROS_SERVICE_NAME, properties.get(SASL_KERBEROS_SERVICE_NAME));
        }
        return props;
    }

    
}

生产消息

@Autowired
    private KafkaTemplate kafkaTemplate;

   @Value("${data.collection.kafka.topic}")
    private String kafkaTopic;
//生产数据	  topic名    数据(JSON格式)
kafkaTemplate.send(kafkaTopic, jsonObject.toJSONString());

消费消息

@KafkaListener(topics = {"${data.collection.mail.info.kafka.topic}"}, containerFactory = "batchFactory")
 public void listen(List<ConsumerRecord<String, String>> records) {
 。。。实现逻辑
 }
上一篇:用数据泵技术实现逻辑备份Oracle 11g R2 数据泵技术详解(expdp impdp)


下一篇:卡夫卡消息队列