一、下载安装
kafka官网:http://kafka.apache.org/intro
(可选)看下官网的introduction
选择get started-quick start ,按照步骤来(官网的例子不是每个版本都能用,所以我这里稍作修改)
1.下载
tar -xzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0
注:还需要有jdk环境
jdk下载:https://www.oracle.com/java/technologies/downloads/,
选择XXX-linux-XX.tar.gz的包下载
安装jdk
(1)tar -zxf 解压文件(压缩为gzip)
(2)配置环境变量,修改.bash_profile
ll -a 查看隐藏文件夹
修改命令:vim -> i ->esc ->shift + : ->wq/q!
在文件后添加
export JAVA_HOME=/root/jdk/jdk1.8.0_311
export PATH=$JAVA_HOME/bin:$PATH
(3)source .bash_profile使配置文件生效
(4)java -version测试安装是否成功
2.启动环境
注:高版本kafka内置zookeeper
cd 到安装目录下比如我的是/root/kafka/kafka_2.13-3.0.0
启动zookeeper端:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动kafka端:
bin/kafka-server-start.sh -daemon config/server.properties
lsof -i:2181 --检查zookeeper是否启动
lsof -i:9092 --检查kafka是否启动
–俩端口都在config/server.properties配置,这里是默认值
–若启动不成功,可查看logs/server.log查看错误原因
lsof命令还没安装的,可以通过下面命令安装一下。
curl -o /etc/yum.repos.d/CentOS-Base.repo https://www.xmpan.com/Centos-6-Vault-Aliyun.repo
yum install lsof
3.创建Topic (kafka端)
具体语法查看(10)
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server 192.168.124.128:9092
若报错,需要配置config/server.properties的listeners host_name填你自己的ip
监听端口指定 listeners=PLAINTEXT://192.168.124.128:9092
对外部暴露端口 advertised.listeners=PLAINTEXT://192.168.124.128:9092
服务器端口对应暴露端口 listener.security.protocol.map=PLAINTEXT:PLAINTEXT
查看topic情况
bin/kafka-topics.sh --list --bootstrap-server 192.168.124.128:9092
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 192.168.124.128:9092
4.往Topic中添加事件(生产者)
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 192.168.124.128:9092
This is my first event
This is my second event
…
Ctrl+C退出保存
5.复制会话,打开另一session,读取事件 (消费者)
–生产者若不退出,可以持续的添加事件
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server 192.168.124.128:9092
此时再生产者输入,消费者会一逐条接收到消息
6.注入数据创建事件流
7.实现的kafka事件流
可以从API开始看,这里有详细的使用教程,包括改导入什么依赖
http://kafka.apache.org/documentation/#producerapi
kafka流的编写,主要需要创建Pipe.java、LineSplit.java、WordCount.java,示例一步步加强功能
https://kafka.apache.org/30/documentation/streams/tutorial
或者你们直接看我写的,参照官网自己码的代码
(1)添加相关topic(参照第3步):我这里用到了TextLinesTopic、streams-plaintext-input、streams-linesplit-output、streams-pipe-output、streams-wordcount-output等。
(2)在pom.xml添加相关依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-scala_2.13</artifactId>
<version>3.0.0</version>
</dependency>
(3)kafka事件流编写
package com.example.springb_web.utils.Kafka;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
//它提供您的 Streams 应用程序的唯一标识符以将其与其他与同一个 Kafka 集群通信的应用程序
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
//指定用于建立与 Kafka 集群的初始连接的主机/端口对列表
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");
//记录键值对的默认序列化和反序列化库
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
//创建输入输出流input/output
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.to("streams-pipe-output");
final Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, props);
//添加CountdownLatch在终止该程序时关闭客户端
final CountDownLatch latch = new CountDownLatch(1);
//若虚拟机停止kafka如Ctrl+C,关闭kafka流
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
//调用 await()方法的任务将一直阻塞等待, 直到这个 CountDownLatch 对象的计数值减到 0 为止
latch.await();
} catch (Throwable e) {
//0.正常退出,是指如果当前程序还有在执行的任务,则等待所有任务执行完成以后再退出;1.非正常退出,只要时间到了,立刻停止
System.exit(1);
}
System.exit(0);
}
}
package com.example.springb_web.utils.Kafka;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
//效果把输入分词换行后输出。如生产者输入hello world,消费者为hello world两次输出
public class LineSplit {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
//它提供您的 Streams 应用程序的唯一标识符以将其与其他与同一个 Kafka 集群通信的应用程序
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-lineSplit");
//指定用于建立与 Kafka 集群的初始连接的主机/端口对列表
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");
//记录键值对的默认序列化和反序列化库
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
//创建输入输出流input/output,每一个流都是String类型化的键值对 需要创建streams-plaintext-input等topic
KStream<String, String> source = builder.stream("streams-plaintext-input");
//* 将值字符串视为文本行,使用FlatMapValues将其拆分为单词
/*KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split("\\W+"));
}
});*/
//KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
//source.to("streams-linesplit-output");
//lamda表达式写法结合流式写法
source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output");
final Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, props);
//添加CountdownLatch在终止该程序时关闭客户端
final CountDownLatch latch = new CountDownLatch(1);
//若虚拟机停止kafka如Ctrl+C,关闭kafka流
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
//调用 await()方法的任务将一直阻塞等待, 直到这个 CountDownLatch 对象的计数值减到 0 为止
latch.await();
} catch (Throwable e) {
//0.正常退出,是指如果当前程序还有在执行的任务,则等待所有任务执行完成以后再退出;1.非正常退出,只要时间到了,立刻停止
System.exit(1);
}
System.exit(0);
}
}
package com.example.springb_web.utils.Kafka;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
/*分词并统计出现单词次数。调用方法不同于Pipe和lineSplit,需要将值反序列化设置为org.apache.kafka.common.serialization.LongDeserializer,不然无法看到正常的output
* bin/kafka-console-consumer.sh --bootstrap-server 192.168.124.128:9092\
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
*
* */
public class WordCount {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
//它提供您的 Streams 应用程序的唯一标识符以将其与其他与同一个 Kafka 集群通信的应用程序
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-lineSplit");
//指定用于建立与 Kafka 集群的初始连接的主机/端口对列表
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");
//记录键值对的默认序列化和反序列化库
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
//创建输入输出流input/output,每一个流都是String类型化的键值对 需要创建streams-plaintext-input等topic
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
//生成一个新的分组流,然后可以由count运算符聚合
.groupBy((key, value) -> value)
//count运算符有一个Materialized参数,该参数指定应将运行计数存储在名为 的状态存储中counts-store。这个Countsstore 可以实时查询
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
//返回值不再是String类型,需要对Long类型提供重写的序列化方法
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
/*
* Sub-topologies:
* Sub-topology: 0
* Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
* Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
* Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
* Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
* Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
* Sub-topology: 1
* Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
* Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
* Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
* Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
*/
KafkaStreams streams = new KafkaStreams(topology, props);
//添加CountdownLatch在终止该程序时关闭客户端
final CountDownLatch latch = new CountDownLatch(1);
//若虚拟机停止kafka如Ctrl+C,关闭kafka流
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
//调用 await()方法的任务将一直阻塞等待, 直到这个 CountDownLatch 对象的计数值减到 0 为止
latch.await();
} catch (Throwable e) {
//0.正常退出,是指如果当前程序还有在执行的任务,则等待所有任务执行完成以后再退出;1.非正常退出,只要时间到了,立刻停止
System.exit(1);
}
System.exit(0);
}
}
(4)生产者消费者你可以直接在linux上用第4、5步中的命令,也可以在java中自己创建相应的类,比如下面
package com.example.springb_web.utils.Kafka;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 0.配置一系列参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");//kafka集群,broker-list
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 1);//重试次数
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批次大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//等待时间
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//RecordAccumulator缓冲区大小
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 1.创建一个生产者对象
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 2.调用send方法
for (int i = 0; i < 10; i++) {
//回调函数在Producer收到ack时异步调用
/*producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)), new Callback() {
//回调函数在Producer收到ack时异步调用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功->" + metadata.offset());
} else {
exception.printStackTrace();
}
}
});*/
producer.send(new ProducerRecord<String, String>("TextLinesTopic", Integer.toString(i), Integer.toString(i)), (metadata, exception) -> {
if (exception == null) {
System.out.println("消息发送成功->" + metadata.offset());
} else {
exception.printStackTrace();
}
});
}
// 3.关闭生产者
producer.close();
}
}
package com.example.springb_web.utils.Kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
//kafka消费消息的方式,kafka是使用pull(拉)模式从broker中读取数据,根据消费者的消费能力以适当的速率消费broker里的消息
//消费者消费数据的核心点在于offset的维护
public class KafkaConsumerTest {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//自动提交offset,默认为true,由于是根据时间来自动提交的,因此是出了问题之后完全不可控的,因此在实际生产中不常使用
//如果先消费数据后提交offset,这时候如果在提交offset的时候挂掉了,后来恢复后,会重复消费那条offset的数据,这样会数据重复,但也就是保证了数据的最少一次性
//如果先提交offset后消费数据,这时候如果在提交offset的时候挂掉了,后来恢复后,那部分offset虽然提交了,但其实是没有消费的,因此就照成了数据的丢失,但是不会重复,也就保证了数据的最多一次性(at most once)。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//消费者组ID,只要group.id相同,就属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groud18");
//创建1个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅主题topic
//consumer.subscribe(Arrays.asList("WordsWithCountsTopic"));
consumer.subscribe(Arrays.asList("TextLinesTopic"));
while (true) {
//如果当前没有数据可供消费,消费者会等待100ms之后再返回数据(批量)
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//offset相当于序号,标记每一条数据的位置,consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费
System.out.println("----------------------------topic = " + record.topic()
+ " offset = " + record.offset() + " value = " + record.value());
}
consumer.commitAsync();
// consumer.commitSync();
}
}
}
(5)验证测试
①首先验证生产者消费者,先启动KafkaConsumerTest,可以看到下面的日志一直在滚动,再启动KafkaProducerTest,可以看到消费者打出了消费日志(眼尖一点,如果没找到就打个断点看)
②Pipe
开启Pipe.java,生产者中topic改为streams-plaintext-input,消费者topic改为
streams-pipe-output ,为了省眼睛,我这里就用linux直接操作好了。。
生产者
bin/kafka-console-producer.sh --topic streams-plaintext-input --bootstrap-server 192.168.124.128:9092
消费者
bin/kafka-console-consumer.sh --topic streams-pipe-output --from-beginning --bootstrap-server 192.168.124.128:9092
可以看到消费者成功输出
③LineSplit(效果把输入分词换行后输出。如生产者输入hello world,消费者为hello world两次输出)
生产者同上,消费者将streams-pipe-output换成streams-linesplit-output,开启LineSplit,生产者输入hello world可以看到结果如下:
④Wordcount(分词并统计出现单词次数)
生产者同上,开启Wordcount,
这里注意,因为输出的结果不是String而且Long,需要将值反序列化设置为org.apache.kafka.common.serialization.LongDeserializer,不然无法看到正常的output。生产者输入hello world后,执行下面操作
bin/kafka-console-consumer.sh --bootstrap-server 192.168.124.128:9092\
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
可以看到结果如下,这里我重复输入了好几遍,所以统计的hello共有四次:
8.关闭kafka
9.相关参数解释
partions:主题分区数。kafka通过分区策略,将不同的分区分配在一个集群中的broker上,一般会分散在不同的broker上(如两个broker,副本因子为1,四个分区,则每个broker分配两分区),
当只有一个broker时,所有的分区就只分配到该Broker上。消息会通过负载均衡发布到不同的分区上,消费者会监测偏移量来获取哪个分区有新数据,从而从该分区上拉取消息数据。
分区数越多,在一定程度上会提升消息处理的吞吐量,因为kafka是基于文件进行读写,因此也需要打开更多的文件句柄,也会增加一定的性能开销。
如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition。
replication-factor:副本因子,用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
若partions 设置为10,replicationFactor设置为1. Broker为2.分区会均匀在broker。broker1分区为13579,broker2为246810;
若partions 设置为10,replicationFactor设置为2. Broker为2.每个broker都有副本存在。broker1和broker2副本均为1到10;
若partions 设置为3,replicationFactor设置为1. Broker为3.每个broker都有副本存在。broker1分区为1,broker2为2,broker2为3,当一个broker宕机了,该topic就无法使用了;
若partions 设置为3,replicationFactor设置为2. Broker为3.每个broker都有副本存在。broker1分区为12,broker2为23,broker2为13,当一个broker宕机了,该topic还能使用了。
可以理解平均每个broker分区数=partions*replication-factor/broker数