二、Kafka与ZK安装

目录

一、安装Zookerper

二、安装Kafka

2.1安装

2.2文件解读

2.3 文件内容

三、测试

1、 创建主题

2、查看主题输入:

2、删除主题

3、 创建生产者

4、 创建消费者

5、查看topic

6、 重要(操作日志的处理):

四、kafka设置多代理集群

1、复制文件修改属性

2、启动各个代理

3、查看日志


一、安装Zookerper

1、 下载安装包

http://zookeeper.apache.org/releases.html#download

或在网盘中下载 https://pan.baidu.com/s/1mr2C9x_I2OREvENRyHja6g 提取码:8866

2、 解压并进入ZooKeeper目录,如:D:\Kafka\zookeeper-3.4.9\conf

3、 将“zoo_sample.cfg”重命名为“zoo.cfg”

4、 打开“zoo.cfg”找到并编辑dataDir=D:\\Kafka\\zookeeper-3.4.9\\tmp(必须以\\分割)

5、 添加系统变量:ZOOKEEPER_HOME=D:\Kafka\zookeeper-3.4.9

6、 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin

7、 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)

8、 打开新的cmd,输入“zkServer“,运行Zookeeper

9、 命令行提示如下:说明本地Zookeeper启动成功

注意:不要关了这个窗口

 

二、安装Kafka

 

2.1安装

1、 下载安装包

http://kafka.apache.org/downloads 或在百度网盘中下载 https://pan.baidu.com/s/1mr2C9x_I2OREvENRyHja6g 提取码:8866

注意要下载二进制版本

2、 解压并进入Kafka目录,笔者:D:\Kafka\kafka_2.12-0.11.0.0

3、 进入config目录找到文件server.properties并打开

4、 找到并编辑log.dirs=D:\Kafka\kafka_2.12-0.11.0.0\kafka-logs,D:\\Kafka\\kafka_2.12-0.11.0.0\\kafka-logs

5、 找到并编辑zookeeper.connect=localhost:2181

6、 Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181

7、cmd 进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0,运行

.\bin\windows\kafka-server-start.bat .\config\server.properties 
或bin\kafka-server-start.sh config\server.properties

注意:不要关了这个窗口,启用Kafka前请确保ZooKeeper实例已经准备好并开始运行

 

2.2文件解读

2.2.1 server.properties

1、设置代理的id。

必须为每个代理设置一个唯一的整数。

broker.id=0

2、设置log文件路径

用于存储日志文件,设置log文件夹的路径

log.dirs=E:\\kafka_2.12-2.4.0\\kafka-logs

3、设置代理的访问端口

每个代理的端口都不一样,保持唯一

  • 代理将向生产者和消费者通告主机名和端口。如果没有设置,
  • 如果配置了,它将使用“监听器”的值。否则,它将使用该值从java.net.InetAddress.getCanonicalHostName()返回。

listeners=PLAINTEXT://:9092

4、日志刷新策略

消息会立即写入文件系统,但默认情况下,我们只使用fsync()来进行同步操作系统缓存延迟。以下配置控制将数据刷新到磁盘。

  • 持久性:如果不使用复制,未刷新的数据可能会丢失。
  • 延迟:当刷新发生时,非常大的刷新间隔可能导致延迟峰值,因为有很多数据需要刷新。
  • 吞吐量:刷新通常是最昂贵的操作,较小的刷新间隔可能导致过多的查找。

下面的设置允许配置刷新策略,以在一段时间后或每N条消息(或都是)这可以全局完成,并在每个主题的基础上重写。

  • 强制刷新之前要接受的消息数:flush.interval.messages=10000
  • 在强制刷新之前,消息在日志中停留的最长时间: log.flush.interval.ms=1000

 

5、日志保留政策

以下配置控制日志段的处理。政策可以被设置为在一段时间后删除段,或在一个给定的大小已经积累。

log.retention.hours=168

log.retention.bytes=1073741824

日志段文件的最大大小。当达到这个大小时,将创建一个新的日志段。

 

6、组织协调器设置

下面的配置以毫秒为单位指定GroupCoordinator将延迟初始消费者再平衡的时间。

当新成员加入组时,rebalance将被group.initial.rebalance.delay.ms的值进一步延迟,最大值为max.poll.interval.ms。

  • 默认值是3秒。
  • 我们在这里将其改写为0,因为它为开发和测试提供了更好的开箱即用的体验。

然而,在生产环境中,默认值3秒更合适,因为这将有助于避免不必要的,可能昂贵的开销

group.initial.rebalance.delay.ms=0

 

2.3 文件内容

 

  • kafka的数据文件是二进制格式的文件,因为二进制的文件大小相对于文本文件更小,所以可以减少数据传输,复制量,提高数据传输速度,节省网络带宽。
  • 在分区目录下面除了存在数据文件之外,还存在一个索引文件,索引文件的作用是加快在数据文件的检索速度,索引文件也是二进制文件,如下以index结尾的就是索引文件,以log结尾的就是数据文件:
xyzdeMacBook-Pro:mytopic-1 xieyizun$ ls -allh
total 0
drwxr-xr-x  4 xieyizun  wheel   128B  4 27 09:56 .
drwxr-xr-x  6 xieyizun  wheel   192B  4 27 20:26 ..
-rw-r--r--  1 xieyizun  wheel    10M  4 27 09:56 00000000000000000000.index
-rw-r--r--  1 xieyizun  wheel     0B  4 27 09:56 00000000000000000000.log
  • 整个分区的数据不是由一个数据文件存放的,而是由多个segments组成的,即上面看到的0000.log文件是其中一个segment文件,文件名是以该文件的第一个数据相对于该分区的全局offset命名的。每当segment文件达到一定的大小,则会创建一个新的segment文件,具体大小在server.properties配置:默认为1G。
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
  • 而索引文件的文件内容也是offset的稀疏索引,从而在消费者消费消息时,broker根据消费者给定的offset,基于二分查找先在索引文件找到该offset对应的数据segment文件的位置,然后基于该位置(或往下)找到对应的数据。

 

具体内容格式如图所示:

二、Kafka与ZK安装

 

三、测试

 

1、 创建主题

cmd进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0/bin/windows目录下

输入:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

注意:不要关了这个窗口

2、查看主题输入:

kafka-topics.bat --list --zookeeper localhost:2181

2、删除主题

注意删除主题前一定要在配置文件中添加一条配置信息

  • delete.topic.enable=true

如果忘记设置了,那么就会提示删除的topic被标记删除了,而不是真正的删除,所以如果重启Kafka服务的话,可能会报错,

解决方案:

  • 第一步首先在server.properties文件中填加delete.topic.enable=true配置
  • 第二步删除server.properties文件中log.dirs=E:\\kafka_2.12-2.4.0\\kafka-logs-01对应的文件
  • 第三步删除E:\zookerper\apache-zookeeper-3.6.3-bin\conf\zoo.cfg文件中dataDir=E:\\zookerper\\apache-zookeeper-3.6.3-bin\\tmp对应的文件

bin/kafka-topics.bat --zookeeper localhost:2181 --delete --topic Hello-kafka

3、 创建生产者

cmd进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0/bin/windows目录下

输入:kafka-console-producer.bat --broker-list localhost:9092 --topic test

注意:不要关了这个窗口

 

4、 创建消费者

在同一个分组内的消费者,不能同时消费同一个主题下的同一个分区的消息,可以消费同一主题下的不同分区的消息。

1、创建消费者不带分组

cmd进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0/bin/windows目录下

输入:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

此时,往生产者窗口写入消息,消费者窗口就能同步的接收到消息

  • from-beginning:代表这个消费从分区的开始接受消息
  • 不加from-beginning:则接受分区最新的消息开始

2、创建消费者在同一个组中

用两个cmd窗口分别创建两个consumer,在cmd中输入相同的指令即可,分组名要一样

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --group group01

5、查看topic

cmd进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0/bin/windows目录下

输入:kafka-topics.bat --list --zookeeper localhost:2181

6、 重要(操作日志的处理):

kafka启动后,如果你去查看kafka所在的根目录,或者是kafka本身的目录,会发现已经默认生成一堆操作日志(这样看起来真心很乱):

    而且会不断生成不同时间戳的操作日志。刚开始不知所措,一番研究后,看了启动的脚本内容,发现启动的时候是会默认使用到这个log4j.properties文件中的配置,而在zoo.cfg是不会看到本身的启动会调用到这个,还以为只有那一个日志路径:

在这里配置一下就可以了,找到config下的log4j.properties:

将路径更改下即可,这样就可以归档在一个文件夹下边了,路径根据自己喜好定义:

另外如何消除不断生成日志的问题,就是同一天的不同时间会不停生成。

修改这里,还是在log4j.properties中:

本身都为trace,字面理解为会生成一堆跟踪日志,将其改为INFO即可。

 

四、kafka设置多代理集群

  到目前为止,我们一直在使用单个代理,这并不好玩。对 Kafka来说,单个代理只是一个大小为一的集群,除了启动更多的代理实例外,没有什么变化。 为了深入了解它,让我们把集群扩展到三个节点(仍然在本地机器上)。

首先,为每个代理创建一个配置文件 。

二、Kafka与ZK安装

1、复制文件修改属性

将这个目录下的server.properties的文件复制两份,并重命名为server-1.properties和server-2.properties

现在编辑这些新文件并设置如下属性:

broker.id=1 //id命名为不一样 listeners=PLAINTEXT://:9093 //端口名命名为不一样 log.dirs=D:\\Kafka\\kafka_2.12-2.4.0\\kafka-logs-1 //日志记录创建的文件夹不一样

 

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。

我们已经建立Zookeeper和一个单节点了。

 

2、启动各个代理

 

将各个结点都用新的cmd窗口打开并启动;

cmd 命令进入到kafka文件夹,执行命令:cd D:\Kafka\kafka_2.12-2.4.0 启动命令:.\bin\windows\kafka-server-start.bat .\config\server-1.properties //注意启动的代理不同,最后的文件名就不同

 

创建一个有三个副本的主题,名字为“my-replicated-topic”

bin/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

 

现在我们有一个集群,但是我们怎么才能知道那些代理在做什么呢?运行"describe topics"命令来查看:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

结果:

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs: Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。

 

  • “leader”是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。
  • “replicas”是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。
  • “isr”是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。
  • 请注意,在示例中,节点1是该主题中唯一分区的领导者。

 

3、查看日志

因为在server.properties文件中指定了代理生成的日志文件存放的目录

可以根据log.dirs的属性找到,一下是他们的日志目录

二、Kafka与ZK安装

点开里面有对应的主题名文件夹

二、Kafka与ZK安装

点开主题文件夹,里面有存放消息的日志,但是被序列化了,即.log文件

二、Kafka与ZK安装

 

 

上一篇:监听ZK节点数据修改


下一篇:打怪升级之小白的大数据之旅(五十五)<Zookeeper命令行与API应用>