目录
一、安装Zookerper
1、 下载安装包
http://zookeeper.apache.org/releases.html#download
或在网盘中下载 https://pan.baidu.com/s/1mr2C9x_I2OREvENRyHja6g 提取码:8866
2、 解压并进入ZooKeeper目录,如:D:\Kafka\zookeeper-3.4.9\conf
3、 将“zoo_sample.cfg”重命名为“zoo.cfg”
4、 打开“zoo.cfg”找到并编辑dataDir=D:\\Kafka\\zookeeper-3.4.9\\tmp(必须以\\分割)
5、 添加系统变量:ZOOKEEPER_HOME=D:\Kafka\zookeeper-3.4.9
6、 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin
7、 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)
8、 打开新的cmd,输入“zkServer“,运行Zookeeper
9、 命令行提示如下:说明本地Zookeeper启动成功
注意:不要关了这个窗口
二、安装Kafka
2.1安装
1、 下载安装包
http://kafka.apache.org/downloads 或在百度网盘中下载 https://pan.baidu.com/s/1mr2C9x_I2OREvENRyHja6g 提取码:8866
注意要下载二进制版本
2、 解压并进入Kafka目录,笔者:D:\Kafka\kafka_2.12-0.11.0.0
3、 进入config目录找到文件server.properties并打开
4、 找到并编辑log.dirs=D:\Kafka\kafka_2.12-0.11.0.0\kafka-logs,D:\\Kafka\\kafka_2.12-0.11.0.0\\kafka-logs
5、 找到并编辑zookeeper.connect=localhost:2181
6、 Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181
7、cmd 进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0,运行
.\bin\windows\kafka-server-start.bat .\config\server.properties
或bin\kafka-server-start.sh config\server.properties
注意:不要关了这个窗口,启用Kafka前请确保ZooKeeper实例已经准备好并开始运行
2.2文件解读
2.2.1 server.properties
1、设置代理的id。
必须为每个代理设置一个唯一的整数。
broker.id=0
2、设置log文件路径
用于存储日志文件,设置log文件夹的路径
log.dirs=E:\\kafka_2.12-2.4.0\\kafka-logs
3、设置代理的访问端口
每个代理的端口都不一样,保持唯一
- 代理将向生产者和消费者通告主机名和端口。如果没有设置,
- 如果配置了,它将使用“监听器”的值。否则,它将使用该值从java.net.InetAddress.getCanonicalHostName()返回。
listeners=PLAINTEXT://:9092
4、日志刷新策略
消息会立即写入文件系统,但默认情况下,我们只使用fsync()来进行同步操作系统缓存延迟。以下配置控制将数据刷新到磁盘。
- 持久性:如果不使用复制,未刷新的数据可能会丢失。
- 延迟:当刷新发生时,非常大的刷新间隔可能导致延迟峰值,因为有很多数据需要刷新。
- 吞吐量:刷新通常是最昂贵的操作,较小的刷新间隔可能导致过多的查找。
下面的设置允许配置刷新策略,以在一段时间后或每N条消息(或都是)这可以全局完成,并在每个主题的基础上重写。
- 强制刷新之前要接受的消息数:flush.interval.messages=10000
- 在强制刷新之前,消息在日志中停留的最长时间: log.flush.interval.ms=1000
5、日志保留政策
以下配置控制日志段的处理。政策可以被设置为在一段时间后删除段,或在一个给定的大小已经积累。
log.retention.hours=168
log.retention.bytes=1073741824
日志段文件的最大大小。当达到这个大小时,将创建一个新的日志段。
6、组织协调器设置
下面的配置以毫秒为单位指定GroupCoordinator将延迟初始消费者再平衡的时间。
当新成员加入组时,rebalance将被group.initial.rebalance.delay.ms的值进一步延迟,最大值为max.poll.interval.ms。
- 默认值是3秒。
- 我们在这里将其改写为0,因为它为开发和测试提供了更好的开箱即用的体验。
然而,在生产环境中,默认值3秒更合适,因为这将有助于避免不必要的,可能昂贵的开销
group.initial.rebalance.delay.ms=0
2.3 文件内容
- kafka的数据文件是二进制格式的文件,因为二进制的文件大小相对于文本文件更小,所以可以减少数据传输,复制量,提高数据传输速度,节省网络带宽。
- 在分区目录下面除了存在数据文件之外,还存在一个索引文件,索引文件的作用是加快在数据文件的检索速度,索引文件也是二进制文件,如下以index结尾的就是索引文件,以log结尾的就是数据文件:
xyzdeMacBook-Pro:mytopic-1 xieyizun$ ls -allh
total 0
drwxr-xr-x 4 xieyizun wheel 128B 4 27 09:56 .
drwxr-xr-x 6 xieyizun wheel 192B 4 27 20:26 ..
-rw-r--r-- 1 xieyizun wheel 10M 4 27 09:56 00000000000000000000.index
-rw-r--r-- 1 xieyizun wheel 0B 4 27 09:56 00000000000000000000.log
- 整个分区的数据不是由一个数据文件存放的,而是由多个segments组成的,即上面看到的0000.log文件是其中一个segment文件,文件名是以该文件的第一个数据相对于该分区的全局offset命名的。每当segment文件达到一定的大小,则会创建一个新的segment文件,具体大小在server.properties配置:默认为1G。
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
- 而索引文件的文件内容也是offset的稀疏索引,从而在消费者消费消息时,broker根据消费者给定的offset,基于二分查找先在索引文件找到该offset对应的数据segment文件的位置,然后基于该位置(或往下)找到对应的数据。
具体内容格式如图所示:
三、测试
1、 创建主题
cmd进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0/bin/windows目录下
输入:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
注意:不要关了这个窗口
2、查看主题输入:
kafka-topics.bat --list --zookeeper localhost:2181
2、删除主题
注意删除主题前一定要在配置文件中添加一条配置信息
- delete.topic.enable=true
如果忘记设置了,那么就会提示删除的topic被标记删除了,而不是真正的删除,所以如果重启Kafka服务的话,可能会报错,
解决方案:
- 第一步首先在server.properties文件中填加delete.topic.enable=true配置
- 第二步删除server.properties文件中log.dirs=E:\\kafka_2.12-2.4.0\\kafka-logs-01对应的文件
- 第三步删除E:\zookerper\apache-zookeeper-3.6.3-bin\conf\zoo.cfg文件中dataDir=E:\\zookerper\\apache-zookeeper-3.6.3-bin\\tmp对应的文件
bin/kafka-topics.bat --zookeeper localhost:2181 --delete --topic Hello-kafka
3、 创建生产者
cmd进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0/bin/windows目录下
输入:kafka-console-producer.bat --broker-list localhost:9092 --topic test
注意:不要关了这个窗口
4、 创建消费者
在同一个分组内的消费者,不能同时消费同一个主题下的同一个分区的消息,可以消费同一主题下的不同分区的消息。
1、创建消费者不带分组
cmd进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0/bin/windows目录下
输入:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
此时,往生产者窗口写入消息,消费者窗口就能同步的接收到消息
- from-beginning:代表这个消费从分区的开始接受消息
- 不加from-beginning:则接受分区最新的消息开始
2、创建消费者在同一个组中
用两个cmd窗口分别创建两个consumer,在cmd中输入相同的指令即可,分组名要一样
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --group group01
5、查看topic
cmd进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0/bin/windows目录下
输入:kafka-topics.bat --list --zookeeper localhost:2181
6、 重要(操作日志的处理):
kafka启动后,如果你去查看kafka所在的根目录,或者是kafka本身的目录,会发现已经默认生成一堆操作日志(这样看起来真心很乱):
而且会不断生成不同时间戳的操作日志。刚开始不知所措,一番研究后,看了启动的脚本内容,发现启动的时候是会默认使用到这个log4j.properties文件中的配置,而在zoo.cfg是不会看到本身的启动会调用到这个,还以为只有那一个日志路径:
在这里配置一下就可以了,找到config下的log4j.properties:
将路径更改下即可,这样就可以归档在一个文件夹下边了,路径根据自己喜好定义:
另外如何消除不断生成日志的问题,就是同一天的不同时间会不停生成。
修改这里,还是在log4j.properties中:
本身都为trace,字面理解为会生成一堆跟踪日志,将其改为INFO即可。
四、kafka设置多代理集群
到目前为止,我们一直在使用单个代理,这并不好玩。对 Kafka来说,单个代理只是一个大小为一的集群,除了启动更多的代理实例外,没有什么变化。 为了深入了解它,让我们把集群扩展到三个节点(仍然在本地机器上)。
首先,为每个代理创建一个配置文件 。
1、复制文件修改属性
将这个目录下的server.properties的文件复制两份,并重命名为server-1.properties和server-2.properties
现在编辑这些新文件并设置如下属性:
broker.id=1 //id命名为不一样 listeners=PLAINTEXT://:9093 //端口名命名为不一样 log.dirs=D:\\Kafka\\kafka_2.12-2.4.0\\kafka-logs-1 //日志记录创建的文件夹不一样
broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。
我们已经建立Zookeeper和一个单节点了。
2、启动各个代理
将各个结点都用新的cmd窗口打开并启动;
cmd 命令进入到kafka文件夹,执行命令:cd D:\Kafka\kafka_2.12-2.4.0 启动命令:.\bin\windows\kafka-server-start.bat .\config\server-1.properties //注意启动的代理不同,最后的文件名就不同
创建一个有三个副本的主题,名字为“my-replicated-topic”
bin/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
现在我们有一个集群,但是我们怎么才能知道那些代理在做什么呢?运行"describe topics"命令来查看:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
结果:
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。
- “leader”是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。
- “replicas”是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。
- “isr”是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。
- 请注意,在示例中,节点1是该主题中唯一分区的领导者。
3、查看日志
因为在server.properties文件中指定了代理生成的日志文件存放的目录
可以根据log.dirs的属性找到,一下是他们的日志目录
点开里面有对应的主题名文件夹
点开主题文件夹,里面有存放消息的日志,但是被序列化了,即.log文件