Kafka Stream实战教程
1. Kafka Streams 基础入门
1.1 什么是 Kafka Streams
Kafka Streams 是 Kafka 生态中用于 处理实时流数据 的一款轻量级流处理库。它利用 Kafka 作为数据来源和数据输出,可以让开发者轻松地对实时数据进行处理,比如计数、聚合、过滤等操作。Kafka Streams 的一个显著特点是其设计简洁,帮助我们快速构建和部署实时流处理应用,而不需要复杂的集群管理。
对比传统流处理框架(如 Spark Streaming):传统流处理框架通常需要独立的集群支持,并有较重的计算资源需求。而 Kafka Streams 内置在 Kafka 中,既不需要单独的集群支持,性能上也更轻量,适合需要实时响应的场景,比如在线日志监控、实时订单处理等。
Kafka Streams 的应用场景
- 实时数据分析:如热门商品实时排名、网站的热点数据追踪
- 实时监控和告警:如系统指标监控,异常行为检测
- 数据清洗与格式转换:如从原始数据中抽取特定字段、转换格式用于下游系统
- 复杂事件处理:如订单状态跟踪、用户行为关联分析
1.2 Kafka Streams 核心概念
要理解 Kafka Streams,先了解几个核心概念:
-
Stream(数据流):一个数据流是源源不断的数据记录流(类似于消息流)。在 Kafka 中,每个数据流对应 Kafka 的一个主题(topic)。
-
Table(表):类似于数据库中的表,是数据的快照,通常包含每个键的最新状态。Kafka Streams 通过将流(Stream)聚合为表(Table),提供了在实时数据上进行去重和合并的能力。
-
KStream 和 KTable
- KStream:一个记录的无状态流,适合用于过滤、转换等操作,适合处理简单的逐条消息处理。
- KTable:类似于数据库的表,有键值对的结构,适合做聚合、去重、统计等操作。
- 两者可以互相转换,比如可以将一个 KStream 聚合成 KTable,也可以从 KTable 中生成 KStream。
-
时间语义:Kafka Streams 提供了事件时间(Event Time)、处理时间(Processing Time)、摄取时间(Ingestion Time)三种时间语义,帮助用户更灵活地处理时序数据。
-
状态存储和窗口(Windows):Kafka Streams 提供内置的状态存储来保存流的中间状态,如用户登录状态等。窗口操作(windowing)允许我们在一定的时间间隔内对流数据进行聚合和分组操作,比如每 5 分钟统计一次某产品的点击量。
流表二元性描述了流和表之间的紧密关系。
- 流作为表:流可以被视为表的变更日志,其中流中的每个数据记录都捕获表的状态变化。因此,流是伪装的表,可以通过从头到尾重放变更日志来重建表,从而轻松地将其转换为“真实”表。同样,在更一般的类比中,聚合流中的数据记录(例如从页面浏览事件流中计算用户的总页面浏览量)将返回一个表(此处的键和值分别是用户及其对应的页面浏览量)。
- 表作为流:表可以被视为某个时间点的快照,是流中每个键的最新值(流的数据记录是键值对)。因此,表是伪装的流,通过迭代表中的每个键值条目,可以轻松地将其转换为“真实”流。
kafka文档
1.3 开发环境搭建
搭建 Kafka Streams 开发环境的步骤如下:
-
安装 Kafka:
- 下载安装 Kafka,然后启动 Kafka 服务和 Zookeeper 服务。
- 常用命令:启动 Kafka 服务器,
bin/kafka-server-start.sh config/server.properties
-
创建 Kafka Streams 项目:
-
新建一个 Maven 或 Gradle 项目,并添加 Kafka Streams 的依赖:
<!-- Maven 依赖 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.0.0</version> </dependency>
-
-
开发Hello Kafka Streams 应用:
- 创建一个简单的 Kafka Streams 应用,读取输入流,进行简单的数据处理,然后输出结果。
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; public class HelloKafkaStreams { public static void main(String[] args) { // 配置 Kafka Streams Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hello-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde"); // 构建流处理拓扑 StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> inputStream = builder.stream("input-topic"); // 进行简单的处理,比如将消息转换为大写 KStream<String, String> processedStream = inputStream.mapValues(value -> value.toUpperCase()); // 将处理后的流写入输出主题 processedStream.to("output-topic"); // 创建并启动 Kafka Streams KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); // 添加关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
-
运行 Kafka Streams 应用:
- 确保 Kafka 服务已启动,运行该应用,将消息发到“input-topic”主题,观察“output-topic”主题中的转换结果。
完成以上步骤后,你已经实现了第一个简单的 Kafka Streams 应用。这个应用读取“input-topic”中的消息,将其内容转换为大写后写入“output-topic”中。
2. Kafka Streams 实现原理
在理解和使用 Kafka Streams 进行流处理之前,深入了解其实现原理可以帮助我们更好地优化应用性能和处理策略。Kafka Streams 作为一个轻量级、分布式的数据处理库,提供了流处理的易用性和强大的实时性。这一节将介绍 Kafka Streams 的实现原理,包括其架构设计和核心组件。
1. Kafka Streams 架构概述
Kafka Streams 是构建在 Kafka 消息系统之上的一个流处理库,它提供了一些特性,使得其容易集成到现有的 Kafka 基础设施中进行实时数据流的处理。Kafka Streams 的主要组成部分包括:
- 流处理拓扑(Topology):描述了应用中各个流处理过程的图结构,包括数据的源、处理逻辑和输出。
- 任务(Tasks):一个 Kafka Streams 应用程序被分配为多个任务,每个任务负责处理特定的分区数据。
- 线程模型:每个 Kafka Streams 实例可以通过配置线程数来实现并行处理。
2. 核心组件
1. 流处理拓扑(Topology)
流处理的核心是通过定义流处理拓扑来实现的。拓扑由多个处理节点(Processor)、source 和 sink 组成。每个节点负责执行特定的数据转换逻辑。
- **Source Processor **:从 Kafka 主题读取数据。
- Processor Node:应用具体的数据处理逻辑,如过滤、转换、聚合等。
- **Sink Processor **:将处理结果输出到 Kafka 主题。
kafka stream core-concepts
Stream Processing Topology
- A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
- A stream processing application is any program that makes use of the Kafka Streams library. It defines its computational logic through one or more processor topologies, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).
- A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.
There are two special processors in the topology:
- Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.
- Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.
Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.
Example:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
// Data processing logic
KStream<String, String> processed = source.filter((key, value) -> value.contains("important"));
processed.to("output-topic");
2. 状态存储(State Store)
Kafka Streams 支持有状态流处理,使用状态存储(如 RocksDB)来保存中间结果。每个处理节点都可以维护自己的状态,以便实现如计数、聚合等操作。
- Persistent State Store:通过内存和磁盘存储队列实现持久化。
- Changelog Topics:每次对状态的更新都会被记录到 Kafka 中的 changelog 主题,确保数据的恢复能力。
3. 时间语义
Kafka Streams 提供了三种时间语义,用于进行窗口化的流分析:
- Event Time:事件或数据记录发生的时间点,即最初在“源头”创建的时间点。**例如:**如果事件是汽车中的 GPS 传感器报告的地理位置变化,则相关事件时间将是 GPS 传感器捕获位置变化的时间。
- Processing Time: 事件或数据记录恰好被流处理应用程序处理的时间点,即记录被使用的时间点。处理时间可能比原始事件时间晚几毫秒、几小时或几天等。
- Ingestion Time:事件被记录进入 Kafka 的时间。
4. 错误处理
通过自定义的异常处理机制(如 DeserializationExceptionHandler
),Kafka Streams 能够继续处理其余数据而不影响整体流程。
3. 任务执行
Kafka Streams 将应用程序拓扑根据 Kafka 主题的分区自动划分为多个任务(Task),这些任务可以在多个线程中并行执行。每个 Task 负责处理特定的分区数据,因此从根本上提高了水平扩展能力。
- 独立性:每个 Task 具有独立的状态和处理逻辑,与其他 Task 相互隔离。
- 自动负载均衡:当 Kafka Streams 实例的数量改变时,任务会自动重新分配,以实现负载均衡。
4. 线程与实例
-
线程配置:通过配置线程数,应用程序可以在单个实例中并行处理多个任务。
Properties props = new Properties(); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); // 设置应用程序使用两个线程
-
实例扩缩:多个实例共同构成 Stream 应用,可以水平扩展应用性能,实例之间通过协调协议共享状态。
总结
理解 Kafka Streams 的实现原理能够帮助我们更高效地开发和部署实时流应用。通过合理设计流处理拓扑、利用状态存储、制定抗故障策略,以及搭配适当的时间语义,Kafka Streams 能够有效地应对复杂的数据流处理场景。最终,这种深刻的理解可以在系统性能优化和调优中发挥关键作用。
3. Kafka Streams 的基础操作
在完成第一个 Kafka Streams 应用后,我们将进一步了解 Kafka Streams 的基础操作,重点关注一些常用的流数据处理方法,包括数据过滤、映射、聚合、分组、和窗口操作等。这些操作让我们可以针对不同业务需求进行丰富的流数据转换和处理。
3.1 基础操作方法概览
在 Kafka Streams 中,我们通常会用 KStream
和 KTable
来表示数据流。以下是一些常见的操作方法:
- 过滤(filter):筛选符合条件的记录
- 映射(map, mapValues):转换每条记录的键和值
- 分组(groupByKey, groupBy):将记录按指定键分组,为聚合操作做准备
- 聚合(count, reduce, aggregate):对记录进行汇总,如计数、求和等
- 窗口操作(windowedBy):按时间窗口进行分组聚合
3.2 数据过滤(Filter)
过滤操作允许我们筛选出符合条件的数据。例如,如果只想要某个主题中记录的特定字段,我们可以使用 filter
方法进行筛选。
示例:假设我们有一个主题 orders
,每条记录包含订单的信息。我们想要过滤出金额大于100的订单:
KStream<String, Order> ordersStream = builder.stream("orders");
// 过滤金额大于100的订单
KStream<String, Order> filteredOrders = ordersStream.filter(
(key, order) -> order.getAmount() > 100
);
filteredOrders.to("filtered-orders");
在此示例中,符合条件的订单将被写入 filtered-orders
主题。
3.3 数据映射(Map 和 MapValues)
映射操作用于修改流中的每条记录。Kafka Streams 提供了 map
和 mapValues
两种方法:
-
map
可以对记录的键和值进行转换; -
mapValues
只会对值进行转换,保留键不变。
示例:将每个订单的金额增加10%并保留其他信息:
KStream<String, Order> updatedOrders = ordersStream.mapValues(
order -> {
order.setAmount(order.getAmount() * 1.1);
return order;
}
);
updatedOrders.to("updated-orders");
这里我们用 mapValues
调整了每个订单的金额,更新后的订单数据会被写入 updated-orders
主题。
3.4 数据分组(GroupBy 和 GroupByKey)
分组操作将数据按指定键重新分组,通常用于聚合操作的前一步。分组后的数据会被存储在 KGroupedStream
中,便于后续的聚合操作。
-
groupByKey
:按现有键分组 -
groupBy
:可指定新的分组键
示例:按用户 ID 对订单数据进行分组:
KGroupedStream<String, Order> ordersByUser = ordersStream.groupBy(
(key, order) -> order.getUserId()
);
在这里,我们按用户 ID 重新分组,以便于在接下来的步骤中对每个用户的订单进行聚合。
3.5 数据聚合(Count、Reduce 和 Aggregate)
聚合操作用于计算分组数据的汇总信息,如计数、求和等。
- count:统计每组记录的数量
- reduce:可以实现自定义的聚合逻辑,例如最大值、最小值等
- aggregate:实现更灵活的聚合操作,可创建复杂的聚合结果
示例:计算每个用户的订单总金额
KTable<String, Double> totalAmountPerUser = ordersByUser.aggregate(
() -> 0.0, // 初始化值
(userId, order, total) -> total + order.getAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
totalAmountPerUser.toStream().to("total-amount-per-user");
这里我们使用 aggregate
方法,按用户 ID 统计每个用户的订单总金额,结果会被写入 total-amount-per-user
主题。
3.6 窗口操作(WindowedBy)
窗口操作用于在时间窗口内对流数据进行分组和聚合,非常适合处理时序数据,例如每小时统计一次销售数据。常用的窗口类型有:
- Tumbling Window:固定长度的窗口,不重叠
- Hopping Window:固定长度,允许窗口之间重叠
- Session Window:根据活动时间自动调整的窗口
示例:每隔5分钟统计一次订单数量
KTable<Windowed<String>, Long> orderCountByWindow = ordersByUser
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
orderCountByWindow.toStream().to("order-count-by-window");
在这个示例中,我们按5分钟窗口统计每个用户的订单数量,结果会被写入 order-count-by-window
主题。
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger) {
return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty());
}
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
}
3.7 实战案例
案例1:订单流数据处理示例
我们将多个操作组合起来,创建一个实际的订单数据处理流程。
需求:对 orders
主题中的订单数据进行以下处理:
- 过滤出金额大于100的订单
- 按用户 ID 重新分组
- 计算每个用户过去1小时的订单数量(使用滚动窗口)
- 将结果写入
high-value-orders
和order-count-by-hour
主题
代码实现:
KStream<String, Order> ordersStream = builder.stream("orders");
// 1. 过滤金额大于100的订单
KStream<String, Order> highValueOrders = ordersStream.filter(
(key, order) -> order.getAmount() > 100
);
highValueOrders.to("high-value-orders");
// 2. 按用户 ID 分组
KGroupedStream<String, Order> ordersByUser = highValueOrders.groupBy(
(key, order) -> order.getUserId()
);
// 3. 每小时统计一次订单数量
KTable<Windowed<String>, Long> hourlyOrderCount = ordersByUser
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.count();
// 4. 将统计结果写入主题
hourlyOrderCount.toStream().to("order-count-by-hour");
通过以上步骤,我们利用 Kafka Streams 的基础操作完成了一个流数据的实时处理任务。
案例 2:销售额实时统计
本案例将带大家了解如何利用 Kafka Streams 实现销售额的实时统计。假设我们有一个主题 sales
,每条记录包含一个订单的销售信息,我们将计算每个商品的实时总销售额和每小时的销售额。
需求分析
我们需要从 sales
主题中读取订单记录,并进行以下处理:
- 过滤出金额大于0的有效订单;
- 按商品 ID 分组计算每个商品的总销售额;
- 对每个商品进行时间窗口统计,计算每小时的销售额;
- 将实时总销售额和每小时的销售额写入不同的 Kafka 主题。
步骤详解
以下是每个步骤的详细实现和代码示例。
步骤 1:过滤有效订单
我们首先从 sales
主题中读取订单流,并过滤掉销售金额小于或等于0的无效订单记录。
KStream<String, SaleOrder> salesStream = builder.stream("sales");
// 过滤出有效的销售记录
KStream<String, SaleOrder> validSalesStream = salesStream.filter(
(key, saleOrder) -> saleOrder.getAmount() > 0
);
在这个代码片段中,我们读取 sales
主题中的数据,使用 filter
方法筛选出 amount
大于0的有效销售记录。
步骤 2:按商品 ID 计算总销售额
接下来,我们将按商品 ID 对订单流重新分组,并计算每个商品的总销售额。
KGroupedStream<String, SaleOrder> salesByProduct = validSalesStream.groupBy(
(key, saleOrder) -> saleOrder.getProductId()
);
KTable<String, Double> totalSalesByProduct = salesByProduct.aggregate(
() -> 0.0, // 初始化值
(productId, saleOrder, total) -> total + saleOrder.getAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
totalSalesByProduct.toStream().to("total-sales-by-product");
在这段代码中:
- 我们按商品 ID 分组;
- 使用
aggregate
方法为每个商品累计销售额; - 将计算出的每个商品的总销售额结果写入
total-sales-by-product
主题。
步骤 3:按小时计算每个商品的销售额
我们为每个商品创建一个滚动窗口,每小时计算一次销售额。这有助于我们按时间区间了解每个商品的销售趋势。
KTable<Windowed<String>, Double> hourlySalesByProduct = salesByProduct
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(
() -> 0.0,
(productId, saleOrder, total) -> total + saleOrder.getAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
hourlySalesByProduct.toStream().to("hourly-sales-by-product");
在这段代码中:
-
windowedBy
方法定义了一个每小时的时间窗口; -
aggregate
计算每小时的销售额; - 结果数据会写入
hourly-sales-by-product
主题,其中窗口包含商品 ID 和每小时的销售额。
步骤 4:综合输出
将上述两种统计结果分别输出到 total-sales-by-product
和 hourly-sales-by-product
主题中,消费者可以订阅这两个主题,获取商品的实时销售额及每小时的销售额动态变化。
完整代码示例
将上述步骤组合成完整的 Kafka Streams 程序代码如下:
StreamsBuilder builder = new StreamsBuilder();
// 1. 从 'sales' 主题读取数据
KStream<String, SaleOrder> salesStream = builder.stream("sales");
// 2. 过滤有效的销售记录
KStream<String, SaleOrder> validSalesStream = salesStream.filter(
(key, saleOrder) -> saleOrder.getAmount() > 0
);
// 3. 按商品 ID 计算总销售额
KGroupedStream<String, SaleOrder> salesByProduct = validSalesStream.groupBy(
(key, saleOrder) -> saleOrder.getProductId()
);
KTable<String, Double> totalSalesByProduct = salesByProduct.aggregate(
() -> 0.0,
(productId, saleOrder, total) -> total + saleOrder.getAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
totalSalesByProduct.toStream().to("total-sales-by-product");
// 4. 按小时计算每个商品的销售额
KTable<Windowed<String>, Double> hourlySalesByProduct = salesByProduct
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(
() -> 0.0,
(productId, saleOrder, total) -> total + saleOrder.getAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
hourlySalesByProduct.toStream().to("hourly-sales-by-product");
// 启动流处理应用程序
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
总结
通过该案例,我们完成了:
- 使用
fil