Kafka修炼日志(三):Streams简明使用教程

Streams是Kafka 10版本新增的功能,用于实时处理存储与Kafka服务器的数据,并将处理后的结果推送至指定的Topic中,供后续使用者使用。

      下面结合官方教程详述如何使用Streams实时分析处理数据,教程的Demo是一个单词计数器:

(1)首先使用Kafka Topic创建命令创建一个用于生产消息的Topic:streams-file-input。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication 3 --partitions 1 --topic streams-file-input

Kafka修炼日志(三):Streams简明使用教程Kafka修炼日志(三):Streams简明使用教程

 编写一个消息测试文件file-input.txt,作为消息生产的输入。

[root@localhost kafka_2.12-0.10.2.0]# echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

Kafka修炼日志(三):Streams简明使用教程Kafka修炼日志(三):Streams简明使用教程

 (2)在Console上启动一个消息生产者,将file-input的消息生产到Topic:streams-file-input。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt &

Kafka修炼日志(三):Streams简明使用教程Kafka修炼日志(三):Streams简明使用教程

(3)使用示例Demo:Word Count,分析处理消息,并将处理后的消息推送至Topic:streams-wordcount-output。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

Kafka修炼日志(三):Streams简明使用教程Kafka修炼日志(三):Streams简明使用教程

(4)在Console中启动一个消费者,验证消息的处理结果,输出为所有单词的计数列表。 

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Kafka修炼日志(三):Streams简明使用教程Kafka修炼日志(三):Streams简明使用教程

(5)消费Topic:streams-wordcount-output中的消息,输出结果,这里计数是双倍,是因为我将消息的生产、处理分别运行了两次,这里是累计结果。

[root@localhost kafka_2.12-0.10.2.0]# all       2
lead    2
to      2
hello   2
streams 4
join    2
kafka   6
summit  2

Kafka修炼日志(三):Streams简明使用教程Kafka修炼日志(三):Streams简明使用教程

Kafka Streams处理原理

    说明下Kafka Streams的处理过程,以上面的Word Count为例,首先看下图(图片来自Kafka官方网站),KTable为单词计数的实时状态,KStream为每次更新的单词,将更新状态输入到KTable,并推送至消息处理结果指定的Topic。

Kafka修炼日志(三):Streams简明使用教程

    下图(图片来自Kafka官方网站)展示了每输入一句单词,已经存在的单词会累计计数,同时KTable会更新不存在的单词。

Kafka修炼日志(三):Streams简明使用教程

本文属作者原创,转贴请声明!

上一篇:《Adobe Photoshop CC经典教程》—第2课2.8节使用Clone Stamp工具修复区域


下一篇:ASP.NET Core 新增用户 - ASP.NET Core 基础教程 - 简单教程,简单编程