08 kafka

1 Kafka概述

1.1 定义

● 基于发布/订阅模式的消息队列(Message Queue)—— 传统

● 分布式事件流平台(event streaming platform)—— 最新定位

1.2 消息队列

1.2.1 传统消息队列的应用场景

1)引用场景

08 kafka

2)使用消息队列的好处

解耦

● 允许独立的扩展、修改两边的处理过程。

可恢复性

● 部分组件失效,不影响整体系统。

缓冲

● 有助于控制和优化数据流经过系统的速度

● 解决生产消息和消费消息的处理速度不一致的情况

灵活性和峰值处理能力

● 使用消息队列能够使关键组件顶住突发的访问压力。

异步通信

● 用户把消息放入队列,并表示不想立即处理。

1.2.2 消息队列的两种模式

1)点对点模式(一对一)

08 kafka

● 消费者主动拉去数据,用后消除

● 一个消息只有一个消费者使用(Queue支持存在多个消费者)

2)发布/订阅模式(一对多)

08 kafka

● 优点:发布到topic的消息会被所有的订阅者消费

● 缺点:不断轮询,看看消息出来没(自助烧烤)

1.3 Kafka基础架构

08 kafka

Producer 消息生产者
Consumer 消息消费者
Consumer Group 消费组
Broker kafka服务器=broker, broker可以容纳多个topic
Topic 队列(先这样理解),生产者和消费者面向的都是一个topic
Partition 有序的队列 (一个topic可分布到多个broker上,一个topic可以分为多个parition)
Replication 副本
leader ”主“,(读写)
follower ”从“,(同步数据),实时与leader副本保持同步

 

2 Kafka快速入门

2.1 安装部署

集群规划——Kafka下载

2.1.2 集群部署

解压——修改文件名——修改配置文件——配置环境变量——分发环境变量——分发安装包——修改brokerid——启动集群——关闭集群

配置文件修改内容

[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
# broker的全局唯一编号,不能重复
broker.id=102
# 删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘IO的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小(100M)
socket.request.max.bytes=104857600
# kafka运行日志(数据)存放的路径
log.dirs=/opt/module/kafka/datas
# topic在当前broker上的分区个数
num.partitions=1
# 启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数量
num.recovery.threads.per.data.dir=1
# segment文件保留的最长时间,超时将被删除
log.retention.hours=168
# 配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
启动kafka集群 bin/kafka-server-start.sh -daemon config/server.properties
关闭kafka集群 bin/kafka-server-stop.sh
-daemon daemon又称为守护进程,后台运行。

2.1.4 群起脚本

1)脚本编写

查看代码

#! /bin/bash
if (($#==0)); then
  echo -e "请输入参数:\n start  启动kafka集群;\n stop  停止kafka集群;\n" && exit
fi

case $1 in
  "start")
    for host in hadoop103 hadoop102 hadoop104
      do
        echo "---------- $1 $host 的kafka ----------"
        ssh $host "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
      done
      ;;
  "stop")
    for host in hadoop103 hadoop102 hadoop104
      do
        echo "---------- $1 $host 的kafka ----------"
        ssh $host "/opt/module/kafka/bin/kafka-server-stop.sh /opt/module/kafka/config/server.properties"
      done
      ;;
    *)
        echo -e "---------- 请输入正确的参数 ----------\n"
        echo -e "start  启动kafka集群;\n stop  停止kafka集群;\n" && exit
      ;;
esac

2)脚本文件添加权限

[atguigu@hadoop102 bin]$ chmod +x kafka.sh

2.2 Kafka命令行操作

2.2.1 Topic相关命令操作

查看所有topic bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
创建topic bin/kafka-topics.sh --bootstrap-server hadoop:9092 --create --replication-factor 2 --partitions 1 --topic first
  --topic  定义topic名
  --replication-factor  定义副本数
  --partitions  定义分区数
删除topic bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
查看topic详情 bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
修改分区数 bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

2.2.2 消息的生产和消费

生产消息 bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
消费消息 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
  bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092  --from-beinning --topic first
  --from-brinning   会把主题中现有的所有的数据读取出来

 

3 Kafka架构深入

3.1 Kafka工作流程及文件存储机制

3.2 Kafka生产者

3.2.1 消息发送流程

Producer发送消息方式:异步发送

两线一变

● main线程:(发)将消息发送给RecordAccumulator。

● Sender线程:(拉)不断从RecordAccumulator中拉取消息发送到Kafka broker

● RecordAccumulator线程共享变量

08 kafka

相关参数:

batch.size:数据累计到batch.size时,sender才会发送数据

linger.ms:如数据迟迟未达到batch.size,sender等待liger.time之后就会发送数据

 

上一篇:JUC并发编程基石AQS之主流程源码解析


下一篇:Kafka在zookeeper中的存储