简述
Kafka是一个分布式流平台,本质是一个消息队列。消息队列的三个作用:异步、消峰和解耦。
一. 安装zookeeper
1.1. 下载并解压
# 下载
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
# 解压
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz
1.2. 修改配置
这里我们需要将解压目录中的config
目录里面的zoo_sample.cfg
复制一份为zoo.cfg
。
这个配置文件就是zookeeper
的配置文件,整体来说我们不需要修改,但这里我们别为了不影响其他的文件目录,只为了我们测试使用,需要修改下zookeeper
的数据目录。
# CS通信心跳时间
tickTime=2000
# 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)
initLimit=10
# 集群中flower服务器(F)跟leader(L)服务器之间的请求和答应最多能容忍的心跳数
syncLimit=5
# 该属性对应的目录是用来存放myid信息跟一些版本,日志,跟服务器唯一的ID信息等
dataDir=/tmp/zookeeper
# 客户端连接的接口,客户端连接zookeeper服务器的端口,zookeeper会监听这个端口,接收客户端的请求访问!这个端口默认是2181
clientPort=2181
#maxClientCnxns=60
#autopurge.snapRetainCount=3
#autopurge.purgeInterval=1
## Metrics Providers
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
我们这里修改dataDir=/tmp/zookeeper
为刚才我们解压的目录!
1.3. 启动zookeeper
进入bin目录,执行./zkServer.sh start
二. 安装kafka
2.1. 下载并解压
# 下载
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
# 解压
tar -zxf kafka_2.12-2.6.0.tgz
2.2. 修改配置文件
我们修改三个位置:
配置项 | 值 | 描述 |
---|---|---|
listeners | PLAINTEXT://192.168.31.26:9092 | broker 服务器要监听的地址及端口 . 默认是 localhost:9092 ,0.0.0.0 的话 ,表示监听本机的所有ip地址. |
advertiesd.listeners | PLAINTEXT://192.168.31.26:9092 | 这个是对外提供的地址 , 当client 请求到kafka时, 会分发这个地址. |
log.dirs | /home/long/kakfa_test/kafka_data | kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-eogs-1,/tmp/kafka-logs-2 |
# 每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=0
############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.31.26:9092
advertiesd.listeners=PLAINTEXT://192.168.31.26:9092
# broker处理消息的最大线程数,一般情况下数量为cpu核数
num.network.threads=3
# broker处理磁盘IO的线程数,数值为cpu核数2倍
num.io.threads=8
# socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400
# socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=102400
# socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600
############################# Log Basics #############################
# kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs=/home/long/kakfa_test/kafka_data
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# 检查是否需要固化到硬盘的时间间隔
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# 数据文件保留多长时间
log.retention.hours=168
#log.retention.bytes=1073741824
# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824
# 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
# ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
2.3. 启动kafka
执行 bin/kafka-server-start.sh config/server.properties
,就可以启动kafka
了。
上面的方法是阻塞执行的,我们可以通过-daemon
进行守护执行。
bin/kafka-server-start.sh -daemon config/server.properties
停止 kafka
,需要执行 bin/kafka-server-stop.sh
三、kafka的基本使用
3.1. kafka中的一些概念
值 | 描述 |
---|---|
topic | 一个虚拟的概念,由1到多个partition组成 |
partition | 实际消息存储单位 |
producer | 消息生产者 |
consumer | 消息消费者 |
3.2. 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic long-topic
参数解释:
参数 | 解释 |
---|---|
–create | 创建topic动作指令 |
–zookeeper | 制定kafka所连接的zookeeper服务地址 |
–topic | 指定了所创建的主题名称 |
–replication-factor | 指定副本因子 |
–partitions | 指定分区个数 |
3.3. 查看所有topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
3.4. 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic long-topic
3.5. 详情topic
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic long-topic
3.6. 发送和接受消息
发送消息
bin/kafka-console-producer.sh --broker-list 192.168.31.26:9092 --topic long-topic
接受消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.26:9092 --topic long-topic --from-beginning
四. kafka涉及名称解释
根据下面的图了解下Kafka涉及到相关名词:
重要名词
名词 | 解释 |
---|---|
Producer | 消息生产者,该角色将消息发布到kafka的topic中,broker接受到生产者发送的消息后,broker将该消息追加到当前追加数据的segment文件中。生产者发送的消息,存储到一个parition中,生产者也可以指定数据存储到parition |
Consumer | 消息消费者,向Kafka Broker取消息的客户端。消费者可以消费多个topic中的数据。 |
Consumer Group | 消费组,由多个Consumer组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 |
Broker | 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。 |
Topic | 话题,可以理解为一个队列, 生产者和消费者面向的都是一个 topic。可以将kafka看作是一个数据库,topic相当于数据库中的一张表,topic相当于表名。 |
Partition | 为了实现扩展性,一个非常大的 topic 可以分割成一个或者多个parition。每一个topic至少拥有一个parition。每一个parition中的数据使用多个segment文件存储。parition是有序的,但是多个parition之间是没有顺序的,在要保证消息的消费顺序的时候,需要将parition数目设置为1。 |
partition offset | 每条消息都有一个当前parition下唯一的64字节的offset,它指明了这条消息的起始位置。 |
Replica | 副本(Replication),为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 Kafka仍然能够继续工作, Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。replicas是不会被消费者消费的。 |
Leader | 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。 |
Follower | 每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。 leader 发生故障时,某个 Follower 会成为新的 leader。 |
zookeeper | 负责维护和协调broker。当kafka系统增加broker或者broker发生故障失效时,由zookeeper通知生产者和消费者,生产者和消费者依据zookeeper的broker状态信息于broker协调数据的发布和订阅任务 |
AR(Assigned Replicas) | 分区中所有的副本统称为AR |
ISR(In Sync Replicas) | 所有与Leader部分保持一定程度的副本(包括leader的副本在内)组成ISR |
OSR | 与Leader副本同步滞后过多的副本 |
HW(High Watermark) | 高水位,标识一个特定的offset,消费者只能拉取到这个offset之前的消息 |
LEO(Log End Offset) | 即日志末端位移,记录了该副本底层日志中下一条消息的位移值。注意是下一条消息! |
五. Kafka特点
特性:
- 高吞吐、低延迟:kafka每秒可以处理几十万消息,延迟最低只有几毫秒,每个主体可以分为多个分区,消费组对分区进行消费操作
- 可扩展性,kafka集群支持热扩展
- 持久性、可靠性,消息可以被持久到本地磁盘,并且支持数据备份防止数据丢失
- 容错性,允许集群结点失败(若副本数量为n,则允许失败n-1个结点)
- 高并发:支持数千个客户端同时读写
六. 使用场景
场景:
- 日志收集:kafka可以收集各种服务的log,通过kafka以统一接口服务开放给各种消费者
- 消息系统:解藕生产者和消费者、缓冲消息等
- 用户活动跟踪:kafka可以用来记录web用户或者app的各种活动操作,做实时监控分析,或者装载到hadoop、数据仓库中做离线分析和数据挖掘
- 运营指标:kafka也可以用来记录运营监控数据。
- 流式处理:storm
七. 技术优势
可伸缩性:
- kafka在运行期间可以轻松的扩展和收缩(可以添加和删除代理),而不会宕机
- 可以扩展一个kafka主题包来包含更多的分区;由于一个分区无法扩展到多个代理,所以它的容量收到代理磁盘空间限制,能够增加分区和代理的数量意味着当个主题可以存储的数据量没有限制。
容错性和可靠性:
kafka的设计方式使某一个代理的故障能够被集群中其他的代理监测到,由于每一个主体都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此故障中继续运行。
吞吐量:
代理能够以快速有效的存储和检索数据
八. Topic、Parition、Broker、Replica、leader和follower之间的关系
- 在
Kafka
中是以Broker
区分集群内服务器的;同一个Topic
下,多个Parition
经Hash
分布到不同的Broker
; - 一个
Topic
(主题)对应多个Parition
(分区),这里Parition
分布在不同的Broker
上,多个Broker
一起提供Kafka
服务;Parition默认是1,不可以减少Parition的数量,但是可以增加;如果想要减少就需要删除原先的Topic
,然后创建新Topic
,重新设置分区数 - 同一个
Topic
中的不同Parition
中数据有顺序性,但是Parition
之间不存在数据顺序性; - 每个
Parition
都会有多个数据Replica
(副本),这些Replica
分布于不同的Broker
中,这些副本中会一个副本是Leader
,其他的副本是Follower
; - 当
producer
或者consumer
发往Parition
的请求,都是通过leader
数据副本所在broker
进行处理,当leader
所在的Broker
发生故障,这个Broker
会成暂时不可用,kakfa
会自动从其他副本选择一个leader
用于接收客户端请求; - 保证在
broker
之间平均分布partition
副本,每个副本分布在不同的broker
上,broker
分布可用轮询或哈希。
九. Java操作Kafka
引入依赖:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
生产者:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;
/**
* @author kirin.麒麟
* @version 1.0.0
* @classname KafkaProducer
* @desc Kafka生产者
* @date 2022/1/9 4:21 下午
*/
public class Producer {
/**
* 主题
*/
public final static String Topic = "test-topic";
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
// 集群通过逗号分割
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.138:9092");
// 设置key和value的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
try {
for (int i = 0; i < 100; i++) {
String msg = "Hello, " + new Random().nextInt(100);
// 构建消息
ProducerRecord<String, String> record = new ProducerRecord<>(Topic, msg);
// 发送
kafkaProducer.send(record);
System.out.println("消息发送:" + msg);
Thread.sleep(500);
}
} finally {
// 执行完之后关闭
kafkaProducer.close();
}
}
}
十. Kafka集群搭建
这里使用一个zookeeper
+三个kafka
组成kafka
集群,如果zookeeper
也需要集群的话可以从前面文章的zookeeper
集群搭建获取shell
脚本。
这里需要注意下面几点:
节点 | kafka-1 | kafka-2 | kafka-3 |
---|---|---|---|
broker.id | 1 | 2 | 3 |
listeners | PLAINTEXT://192.168.0.117:9091 | PLAINTEXT://192.168.0.117:9092 | PLAINTEXT://192.168.0.117:9093 |
advertised.listeners | PLAINTEXT://192.168.0.117:9091 | PLAINTEXT://192.168.0.117:9092 | PLAINTEXT://192.168.0.117:9093 |
log.dirs | kafka-1/data | kafka-2/data | kafka-3/data |
zookeeper.connect | 192.168.0.xxx:2181 | 192.168.0.xxx:2181 | 192.168.0.xxx:2181 |
搭建脚本附上:
install_kafka() {
echo "安装kafka集群"
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -zxvf kafka_2.12-2.8.1.tgz
mv kafka_2.12-2.8.1 kafka-1
cp -r kafka-1 kafka-2
cp -r kafka-1 kafka-3
mkdir kafka-{1,2,3}/data
BasePath=$(pwd)
for (( i = 1; i <= 3; i++ )); do
#statements
sed -i "/^broker.id/cbroker.id=$i" kafka-$i/config/server.properties
# #listeners=PLAINTEXT://:9092
sed -i "/^#listeners/clisteners=PLAINTEXT://192.168.0.117:909$i" kafka-$i/config/server.properties
# #advertised.listeners=PLAINTEXT://your.host.name:9092
sed -i "/^#advertised.listeners/cadvertised.listeners=PLAINTEXT://192.168.0.117:909$i" kafka-$i/config/server.properties
# log.dirs=/tmp/kafka-logs
sed -i "/^log.dirs/clog.dirs=$BasePath\/kafka-$i\/data" kafka-$i/config/server.properties
# zookeeper.connect=localhost:2181
sed -i '/^zookeeper.connect=/czookeeper.connect=192.168.0.117:2181' kafka-$i/config/server.properties
done
}
start() {
echo "启动kafka集群"
for (( i = 1; i <= 3; i++ )); do
./kafka-$i/bin/kafka-server-start.sh -daemon ./kafka-$i/config/server.properties
done
}
stop() {
echo "停止kafka集群"
for (( i = 1; i <= 3; i++ )); do
./kafka-$i/bin/kafka-server-stop.sh
done
}
echo "Kafka集群脚本"
case $1 in
install)
install_kafka
;;
start)
start
;;
stop)
stop
;;
esac
上面演示的是伪集群搭建,学习Kafka足够了,在实际使用的推荐先搭建zookeeper集群在搭建kafka集群。