Day542.kafka基础

kafka

一、基础架构

Day542.kafka基础
Day542.kafka基础


二、Kafka 快速入门

1、集群规划

Day542.kafka基础

2、集群部署

下载地址

1 )解压安装包

tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

2 )修改解压后的文件名称:

mv kafka_2.12-3.0.0/ kafka

3 )进入到/opt/module/kafka 目录,修改配置文件

cd config/
vim server.properties

输入以下内容:
Day542.kafka基础
Day542.kafka基础


#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

3、集群 启停脚本

在/home/atguigu/bin 目录下创建文件 kf.sh 脚本文件

vim kf.sh

脚本如下

#! /bin/bash
case $1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo " --------启动 $i Kafka-------"
		ssh  $i  "/opt/module/kafka/bin/kafka-server-start.sh  -daemon /opt/module/kafka/config/server.properties"
	done
};;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo " --------停止 $i Kafka-------"
		ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
	done
};;
esac

添加执行权限

chmod +x kf.sh

启动集群命令

kf.sh start

停止集群命令

kf.sh stop

Day542.kafka基础


3、Kafka 命令行操作

Day542.kafka基础
Day542.kafka基础

4、生产者命令 行操作

Day542.kafka基础
Day542.kafka基础


三、Kafka 生产者

1、生产者 消息发送流程

①发送原理

在消息发送的过程中,涉及到了 两个线程 ——main 线程Sender 线程

在 main 线程中创建了 一个 双端列队列 RecordAccumulator

main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

Day542.kafka基础

②生产者重要参数列表

Day542.kafka基础
Day542.kafka基础


2、异步送 发送 API

①普通异步发送

需求:创建 Kafka生产者,采用异步的方式发送到 Kafka Broker

导入依赖

<dependencies>
	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-clients</artifactId>
		<version>3.0.0</version>
	</dependency>
</dependencies>

编写不带回调函数的 API代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducer {
	public  static  void  main(String[]  args)  throws InterruptedException {
		// 1. 创建 kafka 生产者的配置对象
		Properties properties = new Properties();
		// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
		"hadoop102:9092");
		// key,value 序列化(必须):key.serializer,value.serializer
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringSerializer");
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringSerializer");
		
		// 3. 创建 kafka 生产者对象
		KafkaProducer<String,  String>  kafkaProducer  =  new
		KafkaProducer<String, String>(properties);
		
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
		kafkaProducer.send(new
		ProducerRecord<>("first","atguigu " + i));
		}
		
		// 5. 关闭资源
		kafkaProducer.close();
	}
}

②带回调函数的 异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(RecordMetadata)和异常信息(Exception)

如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {
	public  static  void  main(String[]  args)  throws InterruptedException {
	// 1. 创建 kafka 生产者的配置对象
	Properties properties = new Properties();
	
	// 2. 给 kafka 配置对象添加配置信息
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
	"hadoop102:9092");
	// key,value 序列化(必须):key.serializer,value.serializer
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	
	// 3. 创建 kafka 生产者对象
	KafkaProducer<String,  String>  kafkaProducer  =  new KafkaProducer<String, String>(properties);
	
	// 4. 调用 send 方法,发送消息
	for (int i = 0; i < 5; i++) {
		// 添加回调
		kafkaProducer.send(new  ProducerRecord<>("first","atguigu " + i), new Callback() {
		// 该方法在 Producer 收到 ack 时调用,为异步调用
		@Override
		public void onCompletion(RecordMetadata metadata,Exception exception) {
			if (exception == null) {
				// 没有异常,输出信息到控制台
				System.out.println(" 主 题 : "  +
				metadata.topic() + "->" + "分区:" + metadata.partition());
			} else {
				// 出现异常打印
				exception.printStackTrace();
			}
		}
	});
	// 延迟一会会看到数据发往不同分区
	Thread.sleep(2);
	}
	// 5. 关闭资源
	kafkaProducer.close();
	}
}

③同步发送 API

只需在异步发送的基础上,再调用一下 get()方法即可。
Day542.kafka基础


四、生产者分区

1、分区好处

Day542.kafka基础

2、生产者发送消息的分区策略

①默认的分区器 DefaultPartitioner

Day542.kafka基础

②自定义分区器

实现步骤

  • (1)定义类实现 Partitioner 接口。
  • (2)重写 partition()方法。
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
	/**
	* 返回信息对应的分区
	* @param topic 主题
	* @param key 消息的 key
	* @param keyBytes 消息的 key 序列化后的字节数组
	* @param value 消息的 value
	* @param valueBytes 消息的 value 序列化后的字节数组
	* @param cluster 集群元数据可以查看分区信息
	* @return
	*/
	@Override
	public  int  partition(String  topic,  Object  key,  byte[]
	keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		// 获取消息
		String msgValue = value.toString();
		// 创建 partition
		int partition;
		// 判断消息是否包含 atguigu
		if (msgValue.contains("atguigu")){
		partition = 0;
		}else {
		partition = 1;
		}
		// 返回分区号
		return partition;
	}
	
	// 关闭资源
	@Override
	public void close() {
	}
	
	// 配置方法
	@Override
	public void configure(Map<String, ?> configs) {
	}
}

使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {
	public  static  void  main(String[]  args)  throws InterruptedException {
	Properties properties = new Properties();
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
	:9092");
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	
	// 添加自定义分区器
	properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atgui
	gu.kafka.producer.MyPartitioner");
	KafkaProducer<String,  String>  kafkaProducer  =  new
	KafkaProducer<>(properties);
	
	for (int i = 0; i < 5; i++) {
		kafkaProducer.send(new  ProducerRecord<>("first",
		"atguigu " + i), new Callback() {
		@Override
		public void onCompletion(RecordMetadata metadata,
		Exception e) {
		if (e == null){
		System.out.println(" 主 题 : "  +
		metadata.topic() + "->" + "分区:" + metadata.partition()
		);
		}else {
		e.printStackTrace();
		}
		}
		});
	}
	kafkaProducer.close();
	}
}

五、生产者 如何提高吞吐量

Day542.kafka基础

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerParameters {
	public  static  void  main(String[]  args)  throwsInterruptedException {
	// 1. 创建 kafka 生产者的配置对象
	Properties properties = new Properties();
	
	// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
	"hadoop102:9092");
	// key,value 序列化(必须):key.serializer,value.serializer
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
	"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
	"org.apache.kafka.common.serialization.StringSerializer");
	// batch.size:批次大小,默认 16K
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
	// linger.ms:等待时间,默认 0
	properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
	// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
	33554432);
	// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
	properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
	
	// 3. 创建 kafka 生产者对象
	KafkaProducer<String,  String>  kafkaProducer  =  new
	KafkaProducer<String, String>(properties);
	
	// 4. 调用 send 方法,发送消息
	for (int i = 0; i < 5; i++) {
		kafkaProducer.send(new
		ProducerRecord<>("first","atguigu " + i));
	}
	// 5. 关闭资源
	kafkaProducer.close();
	}
}

六、数据可靠性

回顾发送流程
Day542.kafka基础

ack 应答原理
Day542.kafka基础
ACK应答级别
Day542.kafka基础

Day542.kafka基础
在配置properties中指定使用对应的ack级别
Day542.kafka基础


七、数据去重

1、数据传递语义

Day542.kafka基础

2、幂等性

①幂等性原理

Day542.kafka基础

②如何使用幂等性

开启参数 enable.idempotence 默认为 true,false关闭。

3、生产者事务

①Kafka事务原理

Day542.kafka基础

②Kafka的事务一共有如下 5个 API

// 1 初始化事务
void initTransactions();

// 2 开启事务
void beginTransaction() throws ProducerFencedException;

// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String  consumerGroupId) throws ProducerFencedException;

// 4 提交事务
void commitTransaction() throws ProducerFencedException;

// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

③单个 Producer,使用事务保证消息的仅一次发送

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {
	public  static  void  main(String[]  args)  throws InterruptedException {
	// 1. 创建 kafka 生产者的配置对象
	Properties properties = new Properties();
	
	// 2. 给 kafka 配置对象添加配置信息
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
	"hadoop102:9092");
	// key,value 序列化
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
	StringSerializer.class.getName());
	
	// 设置事务 id(必须),事务 id 任意起名
	properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
	"transaction_id_0");
	
	// 3. 创建 kafka 生产者对象
	KafkaProducer<String,  String>  kafkaProducer  =  new KafkaProducer<String, String>(properties);
	
	// 初始化事务
	kafkaProducer.initTransactions();
	
	// 开启事务
	kafkaProducer.beginTransaction();
	
	try {
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
			// 发送消息
			kafkaProducer.send(new  ProducerRecord<>("first",
			"atguigu " + i));
		}
		// int i = 1 / 0;
		
		// 提交事务
		kafkaProducer.commitTransaction();
		
	} catch (Exception e) {
		// 终止事务
		kafkaProducer.abortTransaction();
	} finally {
	
		// 5. 关闭资源
		kafkaProducer.close();
		}
	}
}

八、数据有序

Day542.kafka基础

九、数据乱序

Day542.kafka基础


上一篇:Spring注解之@PropertySource注解加载配置文件的属性


下一篇:SpringBoot05——自动配置原理