Kafka 入门

Kafka概念

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Kafka名词解释

名词 解释
Broker MQ服务器端,消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
Producer MQ消息生产者,向Broker服务端发送消息的客户端
Consumer MQ消息消费者,从Broker服务端读取消息的客户端
Consumer Group 消费者组内每个消费者负责消费不同分区的数据,每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition 分区存放消息,一个topic可以分为多个partition
Offset 标记我们消费者消费的位置

Kafka 命令

创建topic

  • zookeeper:kafka 连接的zookeeper地址
  • replication-factor:用来设置主题的副本数,备份因子
  • partitions 分区数量
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

查看主题

  • bootstrap-server:kafka Broker地址
.\kafka-topics.bat --list --bootstrap-server 127.0.0.1:9092

删除主题

  • bootstrap-server:kafka Broker地址
  • topic:topic名称
./kafka-topics.bat --delete --bootstrap-server 127.0.0.1:9092 --topic test

发送消息

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test 
>send a Msg

消费消息

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

SpringBoot 集成

maven 依赖

 <dependency>
    <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
     <version>2.2.4.RELEASE</version>
 </dependency>
 <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>2.1.0</version>
 </dependency>

消费者

/**
 * 发送消息的方法
 *
 * @param key  推送数据的key
 * @param data 推送数据的data
 */
private void send(String key, String data) {
    // topic 名称 key data 消息数据
    kafkaTemplate.send("topicName", key, data);

}

生产者

@KafkaListener(topics = "topicName", groupId = "group1")
public void receive01(ConsumerRecord<?, ?> consumer) {
    log.info(">topic名称:{},,key:{},分区位置:{},offset{},value:{}<",
            consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), consumer.value());
}

配置文件

# kafka
spring:
  kafka:
    # kafka服务器地址(可以多个)
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: group1
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288

Offset Reset 三种模式

  • earliest(最早):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest(最新):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none(没有):topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
上一篇:webService总结(三)——使用CXF + Spring发布webService


下一篇:LeetCode算法题-Happy Number(Java实现)