kafka简介

1. 简介

  Kafka 是一个基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。

1. 使用消息队列的好处:

1. 解耦:允许独立的修改或者扩展两遍的处理过程

2. 可恢复性:系统的一部分组件失效后,不会影响整个系统。

3. 缓冲:有助于控制和优化数据流经过系统的速度,解决生产者和消费者处理消息速度不一致的问题

4.灵活性&峰值处理:使用消息队列处理一下超过限流的任务

2. 消息队列的两种模式

1. 点对点模式:一对一,消费者主动拉取消息,消息收到后消息清除

2.发布/订阅模式:一对多,消费者消费消息后消息不会删除(保留是有期限的),也就是一条消息可以被多个消费者消费。

3. kafka架构

kafka简介

1》producer:消息生产者

2》consumer:消息消费者

3》consumer group:消费者组,由多个consumer组成。 消费者组内每个消费者负责消费不同分区的消息,一个分区只能由一个组内消费者消费;消费者之间互相不影响。所有的消费者都属于一个消费者组,组是一个逻辑上的一个订阅者。

4》broker:一台服务器就是一个broker。一个集群由多个broker组成。一个broker 有多个topic

5》topic:可以理解为一个主题。生产者和消费者面向的都是topic

6》partition:为了实现扩展性,一个大的topic可以分布到多个broker上,一个topic 可以分布到不同的partition,每个partition 是一个有序的队列。

7》replication:副本,每个partition 都有若干个replication,一个leader 和 多个follower

8》leader:每个分区多个副本的主,生产者生产的消息以及消费者消费的消息面向的都是leader

9》follower:每个分区部门的从节点,实时的和主进行同步,负责备份主节点的数据。leader 发生故障时,某个follower会成为新的leader。

2. kafka 安装

  下面基于docker 安装。

1. 安装zk

docker pull hub.c.163.com/cloudpri/zookeeper:latest

测试:

docker run -d --name zookeeper -p 2181:2181 -t hub.c.163.com/cloudpri/zookeeper

进入到容器执行如下命令:

cd /apache-zookeeper-3.5.5-bin/bin
zkCli.sh
ls /    查看目前存在的节点

2. 安装kafka

参考: https://github.com/wurstmeister/kafka-docker  

这个是使用docker-compose up -d 进行设置

1. 获取kafka 镜像

docker pull hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0

2. 编写docker-compose.yml

内容如下:

version: ‘2‘
services:
  zookeeper:
    image: hub.c.163.com/cloudpri/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

3. 测试kafka 进入docker-compose.yml 所在的目录执行如下

$ docker-compose up -d
Creating dockerkafka_zookeeper_1 ... done
Creating dockerkafka_kafka_1     ... done

4. 查看启动了两个进程

$ docker ps -a
CONTAINER ID        IMAGE                                            COMMAND                  CREATED             STATUS              PORTS                                                  NAMES
ee89c3731105        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         2 minutes ago       Up 2 minutes        0.0.0.0:32772->9092/tcp                                dockerkafka_kafka_1
d7d2b0d3c93d        hub.c.163.com/cloudpri/zookeeper                 "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   dockerkafka_zookeeper_1

5. 查看版本号

$ docker exec dockerkafka_kafka_1 find / -name \*kafka_\* | head -1 | grep -o ‘\kafka[^\n]*‘
kafka_2.12-2.3.0

可以看到相关版本

6. 查看zookeeper 版本

$ docker exec dockerkafka_zookeeper_1 pwd
/apache-zookeeper-3.5.5-bin

7. 扩展broker:

(1) 在docker-compose.yml所在的文件夹下,执行以下命令即可将borker总数从1个扩展到4个:

$ docker-compose scale kafka=4
WARNING: The scale command is deprecated. Use the up command with the --scale flag instead.
Starting dockerkafka_kafka_1 ... done
Creating dockerkafka_kafka_2 ... done
Creating dockerkafka_kafka_3 ... done
Creating dockerkafka_kafka_4 ... done

(2)  查看

$ docker ps -a
CONTAINER ID        IMAGE                                            COMMAND                  CREATED              STATUS              PORTS                                                  NAMES
449364fdf044        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         About a minute ago   Up About a minute   0.0.0.0:32775->9092/tcp                                dockerkafka_kafka_3
4965080e898d        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         About a minute ago   Up About a minute   0.0.0.0:32774->9092/tcp                                dockerkafka_kafka_4
2ddebcf2aba8        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         About a minute ago   Up About a minute   0.0.0.0:32773->9092/tcp                                dockerkafka_kafka_2
ee89c3731105        hub.c.163.com/qingzhou/wurstmeister/kafka:v1.0   "start-kafka.sh"         30 minutes ago       Up 30 minutes       0.0.0.0:32772->9092/tcp                                dockerkafka_kafka_1
d7d2b0d3c93d        hub.c.163.com/cloudpri/zookeeper                 "/docker-entrypoint.…"   30 minutes ago       Up 30 minutes       2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   dockerkafka_zookeeper_1

8. 使用kafka

需要进入一个容器

1》创建topic:

创建一个topic,名为 test1,4个partition,副本因子2,执行以下命令即可:

kafka-topics.sh --create --topic test1 --partitions 4 --zookeeper zookeeper:2181 --replication-factor 2

2》查看topic

bash-4.4# kafka-topics.sh --list --zookeeper zookeeper:2181 topic001
test1

3》查看刚刚创建的topic的情况,borker和副本情况

bash-4.4# kafka-topics.sh --describe --zookeeper zookeeper:2181 topic001
Topic:test1     PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: test1    Partition: 0    Leader: 1004    Replicas: 1004,1002     Isr: 1004,1002
        Topic: test1    Partition: 1    Leader: 1001    Replicas: 1001,1003     Isr: 1001,1003
        Topic: test1    Partition: 2    Leader: 1002    Replicas: 1002,1004     Isr: 1002,1004
        Topic: test1    Partition: 3    Leader: 1003    Replicas: 1003,1001     Isr: 1003,1001

9. 测试生产者和消费者

(1) 启动消费者

kafka-console-consumer.sh --topic topic001 --bootstrap-server dockerkafka_kafka_1:9092,dockerkafka_kafka_2:9092,dockerkafka_kafka_3:9092,dockerkafka_kafka_4:9092

启动后控制台不会打印消息,因为没有生产者消费消息。

(2) 启动生产者并且发送消息

kafka-console-producer.sh --topic topic001 --broker-list dockerkafka_kafka_1:9092,dockerkafka_kafka_2:9092,dockerkafka_kafka_3:9092,dockerkafka_kafka_4:9092

 

现在已经进入了生产消息的命令行模式,输入一些字符串然后回车,再去消费消息的控制台窗口看看,已经有消息打印出来,说明消息的生产和消费都成功了。

 

10. 到zk 中查看节点信息,如下:

kafka简介

 

kafka简介

上一篇:本周最新文献速递20210711


下一篇:mybtais-plus