kafka

一、安装kafka

1.需要安装jdk

2.安装kafka

这里用的kafka是二进制安装包

[root@oracle ~]# tar xf kafka_2.12-2.8.1.tgz  -C /usr/local/
[root@oracle ~]# cd /usr/local/
[root@oracle local]# mv kafka_2.12-2.8.1/ kafka
[root@oracle ~]# cd /usr/local/kafka/config

[root@oracle config]# vim server.properties 
log.dirs=/usr/local/kafka/logs/

#修改本机的监听IP地址,只修改端口和IP地址,其它格式都是固定的
listeners=PLAINTEXT://192.168.1.103:9092    

#配置zooker连接地址
zookeeper.connect=localhost:2181
    

#启动kafka
[root@oracle ~]# cd /usr/local/kafka/bin/
[root@oracle bin]# ./kafka-server-start.sh -daemon ../config/server.properties 
[root@oracle bin]# netstat -anpt |grep 9092 |grep -i listen
tcp6       0      0 192.168.1.188:9092      :::*        LISTEN      3135/java

二、kafka的基本概念

名称 解释
Broker 相当于LB,一个kafka节点就一个broker,一个或者多个broker可以组成一个kafka集群
Topic kafka根据topic对消息进行分类,发布到kakfa集群的每条消息都需要指定一个topic
Producer 消息生产者,向broker发送消息的客户端
Consumer 消息消费者,从broker读取消息的客户端
Consumer Group 每一个consumer属于一个特定的Consumer Group 一条消息可以被多个不同的consumer group消费,但是一个consumer group中只能有一个consumer能够消费该消息
Partition 一个topick可以分为多个partition 每个partition内部消息是有序的

三、kafka的基本操作

1.创建topic

[root@oracle bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.188:2181 \
--replication-factor 1 --partitions 1 --topic test
#显示如下
Created topic test.

2.查看有哪些topic

[root@oracle bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
#结果如下:显示了刚才创建的test
test

3.发送消息(生产消息)

[root@oracle bin]# ./kafka-console-producer.sh --broker-list 192.168.1.188:9092 --topic test
>abc
>123
>hello

4.接收消息(消费消息)

4.1 方式1:

从最后一条消息的偏移量+1开始消费

[root@oracle bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092 \ --topic test

4.2 方式2

从头开始消费

[root@oracle bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092\ 
--from-beginning --topic test

4.3 注意:

消息会被存储, 会存储到消息的日志文件中
消息是顺序存储
消息是有偏移量的
消费是可以指明偏移量进行消费的

四、kafka的单播和多播

1.单播消息

一个 “消费组” 里只会有一个消费者能消费到某一个topic中的消息。于是可以创建多个消费者。这些消费者在同一个消费组中

在两个终端中分别启动一个消费者,会发现只有一个消费者能收到消息
./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092 --consumer-property\ group.id=testgroup --topic test

2.多播消息

在一些业务场景中需要让一条消费者被多个消费者消费,那么就可以使用多播模式了,
kafka实现多播 只需要让不同的消费者处于不同的消费组即可

./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092 --consumer-property group.id=testgroup1 --topic test

./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092 --consumer-property group.id=testgroup2 --topic test

五、消费组

1.查看所有消费组

./kafka-consumer-groups.sh --bootstrap-server 192.168.1.188:9092  --list
#显示结果如下
testgroup
testgroup1


解释:
 	--bootstrap-server 指定的是broker的地址,也就是kafka的地址

2.显示某一个消费组的信息

./kafka-consumer-groups.sh --bootstrap-server 10.130.222.13:9092 \
--group etooth \
--describe

解释:
	--group:指定消费组
	--describe:显示详细信息

显示结果解释:
current-offset: 当前消费到第几条消息
log-end-offset: 消息总量条数
lag: 还有多少条消息没有被消费

六、分区

topic(主题)和partition(分区)的区别
通过partition对一个topic中的消息分区来存储。这样的有点是:
1.分区存储,可以解决统一存储文件过大的问题
2.提供了读写的吞吐量,读取和写入可以同时在多个分区中进行

 ./kafka-topics.sh --create --zookeeper 192.168.1.188:2181 \
--replication-factor 1 --partitions 3 --topic test

解释:
--partitions 3 这个参数就是指定在创建topic时要创建几个分区,这里就是创建了3个分区,索引从0开始,保存消息的日志也会分为3个

七、kafka集群

这里为了方便就使用1台机器

[root@oracle config]# cp server.properties server1.properties 
[root@oracle config]# cp server.properties server2.properties 
[root@oracle config]# cat server.properties
broker.id=0
listeners=PLAINTEXT://192.168.1.188:9092
log.dirs=/tmp/kafka-logs

[root@oracle config]# cat server1.properties
broker.id=1
listeners=PLAINTEXT://192.168.1.188:9093
log.dirs=/tmp/kafka-logs1
[root@oracle config]# cat server2.properties
broker.id=2
listeners=PLAINTEXT://192.168.1.188:9094
log.dirs=/tmp/kafka-logs2

启动另外两个节点

./kafka-server-start.sh -daemon ../config/server1.properties 
./kafka-server-start.sh -daemon ../config/server2.properties 

[root@oracle bin]# netstat -antp |egrep "9093|9094"
tcp6       0      0 192.168.1.188:9093      :::*                    LISTEN      7363/java           
tcp6       0      0 192.168.1.188:9094      :::*                    LISTEN      7771/java   

1.副本

副本主要是给分区进行备份
在集群中创建一个topic

创建了zhangsan-test得topic  3个副本 2

[root@oracle bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.188:2181 \
--replication-factor 3 --partitions 2 --topic zhangsan-test
显示结果如下:
Created topic zhangsan-test.

显示topic详细信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xElrtZey-1636375772607)(1kakfa%E7%9A%84%E5%AE%89%E8%A3%85.assets/image-20211107190516450.png)]

partition: 分区 这里得值说明这是这个topic得哪个分区
leader: 领导者,这里得值说明kafka得哪个节点是leader. leader得作用负责消息得读写,然后再将数据同步给其它得节点.消息得生产和消费都会对leader进行。
lsr:可以同步得broker节点和已同步得broker节点,存放在isr结合中

2.集群消息的消费和发送

2.1.向集群发送消息

[root@oracle bin]# ./kafka-console-producer.sh --broker-list 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --topic zhangsan-test
>1
>2
>3

2.2.消费集群消息

[root@oracle bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --topic zhangsan-test --from-beginning

#消费到消息
1
2
3

2.3.不同得消费组消费集群消息

./kafka-console-consumer.sh --bootstrap-server \
192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 \
--topic zhangsan-test --from-beginning -consumer-property group.id=testgroup

./kafka-console-consumer.sh --bootstrap-server \
192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 \
--topic zhangsan-test --from-beginning -consumer-property group.id=testgroup1

一个分区只能被一个消费组中得某一个消费者消费

3.查看集群内所有的消费组

[root@oracle bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --list

4.查看某一个消费组的信息

[root@oracle bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.188:9092,192.168.1.188:9093,192.168.1.188:9094 --group testgroup --describe
上一篇:kafka 部署


下一篇:SpringMVC(九) RequestMapping请求参数