上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解。所选取的案例是对网络遭受的潜在攻击进行检测并给出告警。当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据。
假定一家云服务提供商,有多个跨地区的数据中心,每个数据中心会定时向监控中心上报其瞬时流量。
我们将检测的结果分为三个等级:
- 正常:流量在预设的正常范围内;
- 警告:某数据中心在10秒内连续两次上报的流量超过认定的正常值;
- 报警:某数据中心在30秒内连续两次匹配警告;
首先,我们构建source,这里我们选择的是并行source,因此需要继承RichParallelSourceFunction类。所有的数据通过模拟器随机生成,其中数据中心编号为整型且取值范围为[0, 10),数据生成的事件间隔由PAUSE常量指定,默认为100毫秒:
//parallel source
DataStream<MonitorEvent> inputEventStream = env.addSource(
new MonitorEventSource(
MAX_DATACENTER_ID,
STREAM_STD,
STREAM_MEAN,
PAUSE
)
).assignTimestampsAndWatermarks(new IngestionTimeExtractor<MonitorEvent>());
下面,我们来构建警告模式,按照我们设定的警告等级,其模式定义如下:
Pattern<MonitorEvent, ?> warningPattern = Pattern.<MonitorEvent>begin("first")
.subtype(NetworkStreamEvent.class)
.where(evt -> evt.getStream() >= STREAM_THRESHOLD)
.next("second")
.subtype(NetworkStreamEvent.class)
.where(evt -> evt.getStream() >= STREAM_THRESHOLD)
.within(Time.seconds(10));
根据该模式构建模式流:
PatternStream<MonitorEvent> warningPatternStream =
CEP.pattern(inputEventStream.keyBy("dataCenterId"), warningPattern);
在警告的模式流中筛选出配对的警告事件对,生成警告事件对象流(告警事件对象会算出,前后两个匹配的流量事件的平均值):
DataStream<NetworkStreamWarning> warnings = warningPatternStream.select(
(Map<String, MonitorEvent> pattern) -> {
NetworkStreamEvent first = (NetworkStreamEvent) pattern.get("first");
NetworkStreamEvent second = (NetworkStreamEvent) pattern.get("second");
return new NetworkStreamWarning(first.getDataCenterId(),
(first.getStream() + second.getStream()) / 2);
}
);
按照设定的等级,告警模式定义如下:
Pattern<NetworkStreamWarning, ?> alertPattern = Pattern.<NetworkStreamWarning>
begin("first").next("second").within(Time.seconds(30));
在警告事件流中应用告警模式,得到告警模式流:
PatternStream<NetworkStreamWarning> alertPatternStream = CEP.pattern(warnings.keyBy
("dataCenterId"), alertPattern);
在告警模式流中匹配警告模式对,如果模式对中第一个警告对象的平均流量值小于第二个警告对象的平均流量值,则构建告警对象并输出该对象从而形成告警流:
DataStream<NetworkStreamAlert> alerts = alertPatternStream.flatSelect(
(Map<String, NetworkStreamWarning> pattern, Collector<NetworkStreamAlert> out) -> {
NetworkStreamWarning first = pattern.get("first");
NetworkStreamWarning second = pattern.get("second");
//first avg < second avg
if (first.getAverageStream() < second.getAverageStream()) {
out.collect(new NetworkStreamAlert(first.getDataCenterId()));
}
}
);
最终,sink到控制台:
warnings.print();
alerts.print();
从上面的代码段可见,CEP的关键是定义合适的模式。关于模式的相关的API,我们之前已进行过分析。为了节省篇幅,本文只列出了核心代码片段。
需要注意的是,因为包含Java 8的lambdas,当你使用javac作为编译器时,将会得到错误提示:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException:
The generic type parameters of 'Map' are missing.
It seems that your compiler has not stored them into the .class file.
Currently, only the Eclipse JDT compiler preserves the type information necessary to
use the lambdas feature type-safely.
See the documentation for more information about how to compile jobs containing lambda expressions.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1331)
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1317)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:347)
at org.apache.flink.cep.PatternStream.select(PatternStream.java:81)
at com.diveintoapacheflink.chapter11.NetworkAttackMonitor.main(NetworkAttackMonitor.java:55)
at ...
解决方案是使用Eclipse JDT来编译代码。