Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序!
kafka官网:http://kafka.apache.org/
他的核心功能:
1.高吞吐量:使用延迟低至 2 毫秒的机器集群以网络有限的吞吐量传递消息
2.可扩展:将生产集群扩展到多达一千个代理、每天数万亿条消息、PB 级数据、数十万个分区。弹性扩展和收缩存储和处理。
3.永久存储:将数据流安全地存储在分布式、持久、容错的集群中。
4.高可用性:在可用区上有效地扩展集群或跨地理区域连接单独的集群。
kafka名词解释:
-
topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
- 批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。
- 分区Partition:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性。topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个文件进行存储。partition中的数据是有序的,partition之间的数据是没有顺序的。如果topic有多个partition,消费数据时就不能保证数据的顺序(比如A 和B 中 数据都有序,那把A和B的数据放在一块就无序了)。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1,默认是多个。
-
producer:发布消息的对象称之为主题生产者(Kafka topic producer)
-
consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
-
broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
- Broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来),相当于leader。
- 副本Replica: Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica);所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从ISR列表(保持同步的副本列表)中删除,重新创建一个Follower。
- Zookeeper: kafka对与zookeeper是强依赖的,是以zookeeper作为基础的,即使不做集群,也需要zk的支持。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行重平衡。
- 消费者群组Consumer Group: 生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。
- 偏移量Consumer Offset: 偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据,与下面某个消费者挂掉后相关联,比如这个消费者已经消费了3条消息,然后挂掉了,那么偏移量就会记录下来,在重平衡的时候其他的消费者也会从4开始平均分配数据。
- 重平衡Rebalance: 消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程,就是说其中一个挂了,那么他的数据会平均分配给其他的消费者。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
在是用kafka的前提下必须是java环境,也就是jdk,其次就是Zookpeer;在启动kafka前必须先启动zk
Zookeeper是安装Kafka集群的必要组件,Kafka通过Zookeeper来实施对元数据信息的管理,包括集群、主题、分区等内容。
这里不做环境准备的演示,直接进入demo
1.添加依赖 做一个简单的生产者与消费组 只需要依赖kafka-clients
即可
<!--kafka依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
2.创建一个Kafka生产者
package com.xiaoteng.kafka.simple;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 消息生产者
*/
public class ProducerFastStart {
//topic
private static final String TOPIC = "kafkaDemo";
public static void main(String[] args) {
//添加kafka的配置信息
Properties properties = new Properties();
//配置broker信息
properties.put("bootstrap.servers","192.168.200.130:9092");
//key 和 value 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//重试次数为10 当发送消息失败时 重试 10次后不成功会报错
properties.put(ProducerConfig.RETRIES_CONFIG,10);
//生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
//封装消息 第一个为主题 topic 第二个发送消息的key 第三个就是我们发送的消息
ProducerRecord<String,String> record = new ProducerRecord<String, String>(TOPIC,"msg","hello kafka!");
//发送消息
try {
producer.send(record);
}catch (Exception e){
e.printStackTrace();
}
//关系消息通道
producer.close();
}
}
3.创建消费者
package com.xiaoteng.kafka.simple;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 消息消费者
*/
public class ConsumerFastStart {
//topic
private static final String TOPIC = "kafkaDemo";
public static void main(String[] args) {
//添加配置信息
Properties properties = new Properties();
//ip端口
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
//key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//设置分组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅主题 必须与生产者保持一致
consumer.subscribe(Collections.singletonList(TOPIC));
while (true){
//收消息间隔 与 消息载体
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key());
System.out.println(record.value());
}
}
}
}
这就已经实现了简单的生产者发送消息,消费者接收消息。
1.可以看到消费者当前的组为group1,当我启动两个消费者,接收同一个topic,组名都为group1时,生产者去发送消息,同一个组内,只要一个消费者接收到消息。
2.当已经启动了一个group1时,再启动一个gruop2,接收同一个topic时,两个都能接收到生产者发送的消息。
3.当要实现广播功能时,参考2就行,每个消费者都有自己的一个组。每个组只能有一个消费者接收到消息!
生产者的工作原理:
1.连接上kafka集群
2.发消息
3.leader将消息写入本地文件
4.将消息同步一份给追随者follower 副本
5.追随者follwer 将数据写入本地文件,并将ack(确认消息)
6.leader收到所有副本的确认消息后,向生产者发送确认消息
ack也分不同配置,不同工作原理。
1.发送类型:
1.发送并忘记(fire-and-forget): 把消息发送给服务器,并不关心它是否正常到达,大多数情况下,消息会正常到达,因为kafka是高可用的,而且生产者会自动尝试重发,使用这种方式有时候会丢失一些信息,因为他没有返回ack
2.同步发送: 使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
try {
RecordMetadata recordMetadata = producer.send(record).get();
long offset = recordMetadata.offset();//偏移量
System.out.println(offset);
}catch (Exception e){
e.printStackTrace();
}
如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们就会得到一个RecordMetadata对象,可以用它来获取消息的偏移量,并且偏移量是一次递增的。
3.异步发送: 调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。如下代码
try { producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//如果异常为空,说明异步发送成功
if (null!=exception){
exception.printStackTrace();
}
//获取偏移量
System.out.println(metadata.offset());
}
});
}catch (Exception e){
e.printStackTrace();
}
如果kafka返回一个错误,onCompletion()方法会抛出一个非空(non null)异常,可以根据实际情况处理,比如记录错误日志,或者把消息写入“错误消息”文件中,方便后期进行分析。
2.参数详解:
通过上面的入门代码,已经看到了几个必要的参数(bootstrap.servers、序列化器,重试等) 基本上都在producerConfig这个类中能找到
bootstrap.servers:就是kafka连接地址与端口号。
retries重试:生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
生产者还有很多可配置的参数,在kafka官方文档中都有说明,大部分都有合理的默认值,所以没有必要去修改它们,不过有几个参数在内存使用,性能和可靠性方法对生产者有影响
acks:指的是producer的消息发送确认机制
acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应,也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。高吞吐量,低可靠性。
acks=1: 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应,如果消息无法到达首领节点,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。默认情况下是等于1,也是吞吐量与可靠性的一个折中方案。 但是等于1的情况下,也有可能出现数据丢失,就是leader返回一个acks给producer后,还没得及同步消息给follwoer就挂了,导致数据丢失了。
acks=all : 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过他的延迟比acks=1时更高。低吞吐量,高可靠性。
消费者工作原理:
1.连接kafka集群
2.从kafka集群中拉取信息
3.kafka集群根据偏移量查询信息 offser就是偏移量 一个long型数值
4.将数据同步给消费者
1.参数详解:
与生产者类似 基本上都在ConsumerConfig这个类中能找到,向上面的入门代码中,连接kafka,反序列化,以及分组都有 或者直接去kafka文档看。
eauto.commit: 该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设置为false,由自己控制何时提交偏移量。如果把它设置为true,还可以通过配置auto.commit.interval.ms
属性来控制提交的频率(就是多长时间提交一次)。
auto.offset.reset:
1.earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
2.latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
3.none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
4.anything else(其他情况,不是上面三种情况就抛出异常): 向consumer抛出异常
2.提交和偏移量:
每次调用poll()方法,它会返回由生产者写入kafka但还没有被消费者读取过来的记录,我们由此可以追踪到哪些记录是被群组里的哪个消费者读取的,kafka不会像其他JMS队列那样需要得到消费者的确认,这是kafka的一个独特之处,相反,消费者可以使用kafka来追踪消息在分区的位置(偏移量)
消费者会往一个叫做_consumer_offset
的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。如下图:
消费者当前提交的偏移量为2,而他已经消费到11了,当他挂了时,在进行重平衡的时候,是从提交的偏移量开始继续消费的,导致了数据的重复消费。
如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。如下图:
当他提交的偏移量为11时,而他只消费到了3,在他挂了一个进行重平衡时,是从提交的偏移量开始消费的,导致数据的丢失。
3.自动提交偏移量:
当enable.auto.commit
被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去。提交时间间隔有auto.commot.interval.ms
控制,默认值是5秒。
需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
4.提交当前偏移量(同步提交)
把enable.auto.commit
设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
主要分为两部:1.把enable.auto.commit
设置为false 2.
//设置同步提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
while (true){
//收消息间隔 与 消息载体
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key());
System.out.println(record.value());
try {
//手动/同步 提交 偏移量
consumer.commitSync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.提交当前偏移量(异步提交)
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e!=null){
System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
}
}
});
}
5.提交当前偏移量(同步和异步组合提交)
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
try {
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
//异步提交
consumer.commitAsync();
}
}catch (Exception e){
e.printStackTrace();
System.out.println("记录错误信息:"+e);
}finally {
try {
//不管异步成功与否,都同步提交一下,确保消息每一次都提交了
consumer.commitSync();
}finally {
consumer.close();
}
}
SpringBoot集成Kafka
1.依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaoteng.kafka</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Demo project for Spring Boot</description>
<!-- 继承Spring boot工程 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
</parent>
<properties>
<kafka.version>2.2.7.RELEASE</kafka.version>
<kafka.client.version>2.0.1</kafka.client.version>
<fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.client.version}</version>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
</project>
2.application.yml 配置文件,我这是偷懒,把两者都放到一块了,既是生产者也是消费者
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test-hello-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.创建生产者
package com.xiaoteng.kafka.controller;
import com.alibaba.fastjson.JSON;
import com.xiaoteng.kafka.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/hello")
public String hello() {
//第一个参数:topics
//第二个参数:消息内容 字符串信息
kafkaTemplate.send("hello-kafka", "哈喽kafka");
return "ok";
}
@GetMapping("/hello2")
public String hello2() {
User u = new User();
u.setName("张三");
//第一个参数:topics [] "{}"
//第二个参数:消息内容 对象转为json 把对象发过去
kafkaTemplate.send("hello-kafka2", JSON.toJSONString(u));//发送消息
return "消息ok";
}
}
4.消费者
package com.xiaoteng.kafka.listener;
import com.alibaba.fastjson.JSON;
import com.xiaoteng.kafka.entity.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Optional;
@Component
public class HelloListener {
@KafkaListener(topics = {"hello-kafka"})
public void receiverMessage(ConsumerRecord<?, ?> record) {
//接收信息,判断是否为空
Optional<? extends ConsumerRecord<?, ?>> optional = Optional.ofNullable(record);
//如果不为空
if (optional.isPresent()) {
//拿到对象信息
Object value = record.value();
System.out.println(value);
//展示 哈喽kafka
}
}
@KafkaListener(topics = {"hello-kafka2"})
public void listener(String record) {
//拿到信息 转为对象
User user = JSON.parseObject(record, User.class);
System.out.println("record:" + record);
System.out.println("对象" + user);
//展示 record:{"name":"张三"}
//展示 对象User{name='张三'}
//我这里的user对象 只设置了一个Name属性
}
}