1 Kafka概述
1.1 定义
● 基于发布/订阅模式的消息队列(Message Queue)—— 传统
● 分布式事件流平台(event streaming platform)—— 最新定位
1.2 消息队列
1.2.1 传统消息队列的应用场景
1)引用场景
2)使用消息队列的好处
解耦
● 允许独立的扩展、修改两边的处理过程。
可恢复性
● 部分组件失效,不影响整体系统。
缓冲
● 有助于控制和优化数据流经过系统的速度
● 解决生产消息和消费消息的处理速度不一致的情况
灵活性和峰值处理能力
● 使用消息队列能够使关键组件顶住突发的访问压力。
异步通信
● 用户把消息放入队列,并表示不想立即处理。
1.2.2 消息队列的两种模式
1)点对点模式(一对一)
● 消费者主动拉去数据,用后消除
● 一个消息只有一个消费者使用(Queue支持存在多个消费者)
2)发布/订阅模式(一对多)
● 优点:发布到topic的消息会被所有的订阅者消费
● 缺点:不断轮询,看看消息出来没(自助烧烤)
1.3 Kafka基础架构
Producer | 消息生产者 |
Consumer | 消息消费者 |
Consumer Group | 消费组 |
Broker | kafka服务器=broker, broker可以容纳多个topic |
Topic | 队列(先这样理解),生产者和消费者面向的都是一个topic |
Partition | 有序的队列 (一个topic可分布到多个broker上,一个topic可以分为多个parition) |
Replication | 副本 |
leader | ”主“,(读写) |
follower | ”从“,(同步数据),实时与leader副本保持同步 |
2 Kafka快速入门
2.1 安装部署
集群规划——Kafka下载
2.1.2 集群部署
解压——修改文件名——修改配置文件——配置环境变量——分发环境变量——分发安装包——修改brokerid——启动集群——关闭集群
配置文件修改内容
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
# broker的全局唯一编号,不能重复
broker.id=102
# 删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘IO的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小(100M)
socket.request.max.bytes=104857600
# kafka运行日志(数据)存放的路径
log.dirs=/opt/module/kafka/datas
# topic在当前broker上的分区个数
num.partitions=1
# 启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数量
num.recovery.threads.per.data.dir=1
# segment文件保留的最长时间,超时将被删除
log.retention.hours=168
# 配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
启动kafka集群 | bin/kafka-server-start.sh -daemon config/server.properties |
关闭kafka集群 | bin/kafka-server-stop.sh |
-daemon | daemon又称为守护进程,后台运行。 |
2.1.4 群起脚本
1)脚本编写
查看代码
#! /bin/bash
if (($#==0)); then
echo -e "请输入参数:\n start 启动kafka集群;\n stop 停止kafka集群;\n" && exit
fi
case $1 in
"start")
for host in hadoop103 hadoop102 hadoop104
do
echo "---------- $1 $host 的kafka ----------"
ssh $host "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
;;
"stop")
for host in hadoop103 hadoop102 hadoop104
do
echo "---------- $1 $host 的kafka ----------"
ssh $host "/opt/module/kafka/bin/kafka-server-stop.sh /opt/module/kafka/config/server.properties"
done
;;
*)
echo -e "---------- 请输入正确的参数 ----------\n"
echo -e "start 启动kafka集群;\n stop 停止kafka集群;\n" && exit
;;
esac
2)脚本文件添加权限
[atguigu@hadoop102 bin]$ chmod +x kafka.sh
2.2 Kafka命令行操作
2.2.1 Topic相关命令操作
查看所有topic | bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list |
创建topic | bin/kafka-topics.sh --bootstrap-server hadoop:9092 --create --replication-factor 2 --partitions 1 --topic first |
--topic 定义topic名 | |
--replication-factor 定义副本数 | |
--partitions 定义分区数 | |
删除topic | bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first |
查看topic详情 | bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first |
修改分区数 | bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3 |
2.2.2 消息的生产和消费
生产消息 | bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first |
消费消息 | bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first |
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beinning --topic first | |
--from-brinning 会把主题中现有的所有的数据读取出来 |
3 Kafka架构深入
3.1 Kafka工作流程及文件存储机制
3.2 Kafka生产者
3.2.1 消息发送流程
Producer发送消息方式:异步发送
两线一变
● main线程:(发)将消息发送给RecordAccumulator。
● Sender线程:(拉)不断从RecordAccumulator中拉取消息发送到Kafka broker
● RecordAccumulator线程共享变量
相关参数:
batch.size:数据累计到batch.size时,sender才会发送数据
linger.ms:如数据迟迟未达到batch.size,sender等待liger.time之后就会发送数据