https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html
首先目的是匹配pattern sequence
pattern Sequence是由多个pattern构成
DataStream<Event> input = ... Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<Event>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
); PatternStream<Event> patternStream = CEP.pattern(input, pattern); DataStream<Alert> result = patternStream.select(
new PatternSelectFunction<Event, Alert> {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlertFrom(pattern);
}
}
});
如例子中,这个pattern Sequence由3个pattern组成,begin,next,followedBy
pattern Sequence的第一个pattern都是begin
每个pattern都需要有一个唯一的名字,比如这里的start,middle,end
每个pattern也可以设置condition,比如where
Pattern
Pattern可以分为两种,Individual Patterns,Complex Patterns.
对于individual patterns,又可以分为singleton pattern, or a looping one
通俗点,singleton pattern指出现一次,而looping指可能出现多次,在有限自动机里面匹配相同的pattern就形成looping
比如,对于a b+ c? d
b+就是looping,而其他的都是singleton
对于singleton pattern可以加上Quantifiers,就变成looping
// expecting 4 occurrences
start.times(4); // expecting 0 or 4 occurrences
start.times(4).optional(); // expecting 1 or more occurrences
start.oneOrMore(); // expecting 0 or more occurrences
start.oneOrMore().optional();
同一个pattenr的多次匹配可以定义Contiguity
illustrate the above with an example, a pattern sequence "a+ b"
(one or more "a"
’s followed by a "b"
) with input "a1", "c", "a2", "b"
will have the following results:
-
Strict Contiguity:
{a2 b}
– the"c"
after"a1"
causes"a1"
to be discarded. -
Relaxed Contiguity:
{a1 b}
and{a1 a2 b}
–c
is simply ignored. Non-Deterministic Relaxed Contiguity:
{a1 b}
,{a2 b}
, and{a1 a2 b}
.
oneOrMore()
and times()
) the default is relaxed contiguity. If you want strict contiguity, you have to explicitly specify it by using the consecutive()
call, and if you want non-deterministic relaxed contiguity you can use the allowCombinations()
call
consecutive() 的使用例子,
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B
with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}
这是针对单个pattern的Contiguity,后面还可以定义pattern之间的Contiguity
当然对于Pattern,很关键的是Conditions
就是条件,怎么样算匹配上?
Conditions 分为好几种,
Simple Conditions
start.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});
很容易理解,单纯的根据当前Event来判断
Iterative Conditions
This is how you can specify a condition that accepts subsequent events based on properties of the previously accepted events or some statistic over a subset of them.
即当判断这个条件是否满足时,需要参考之前已经匹配过的pattern,所以称为iterative
Below is the code for an iterative condition that accepts the next event for a pattern named “middle” if its name starts with “foo”, and if the sum of the prices of the previously accepted events for that pattern plus the price of the current event do not exceed the value of 5.0. Iterative conditions can be very powerful, especially in combination with looping patterns, e.g. oneOrMore()
.
middle.oneOrMore().where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
if (!value.getName().startsWith("foo")) {
return false;
} double sum = value.getPrice();
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});
首先这是个oneOrMore,可以匹配一个或多个,但匹配每一个时,除了判断是否以“foo”开头外
还要判断和之前匹配的event的price的求和小于5
这里主要用到ctx.getEventsForPattern,取出某个名字的pattern当前的所有的匹配
Combining Conditions
pattern.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // some condition
}
}).or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // or condition
}
});
可以有多个条件,where表示“and”语义,而or表示“or” 语义
Pattern Sequence
sequence是有多个pattern组成,那么多个pattern之间是什么关系?
A pattern sequence has to start with an initial pattern, as shown below:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
每个sequence都必须要有个开始,begin
Next, you can append more patterns to your pattern sequence by specifying the desired contiguity conditions between them.
-
next()
, for strict, -
followedBy()
, for relaxed, and -
followedByAny()
, for non-deterministic relaxed contiguity.
or
-
notNext()
, if you do not want an event type to directly follow another -
notFollowedBy()
, if you do not want an event type to be anywhere between two other event types
在begin开始后, 可以加上各种pattern,pattern之间的Contiguity关系有上面几种
例子,
As an example, a pattern a b
, given the event sequence"a", "c", "b1", "b2"
, will give the following results:
-
Strict Contiguity between
a
andb
:{}
(no match) – the"c"
after"a"
causes"a"
to be discarded. -
Relaxed Contiguity between
a
andb
:{a b1}
– as relaxed continuity is viewed as “skip non-matching events till the next matching one”. Non-Deterministic Relaxed Contiguity between
a
andb
:{a b1}
,{a b2}
– as this is the most general form.
temporal constraint
一个sequence还可以指定时间限制,supported for both processing and event time
next.within(Time.seconds(10));
Detecting Patterns
当定义好pattern sequence后,我们需要真正的去detect,
DataStream<Event> input = ...
Pattern<Event, ?> pattern = ... PatternStream<Event> patternStream = CEP.pattern(input, pattern);
生成PatternStream
The input stream can be keyed or non-keyed depending on your use-case
Applying your pattern on a non-keyed stream will result in a job with parallelism equal to 1
如果non-keyed stream,并发只能是1
如果是keyed stream,不同的key可以单独的detect pattern,所以可以并发
Once you have obtained a PatternStream
you can select from detected event sequences via the select
or flatSelect
methods.
对于PatternStream,可以用
PatternSelectFunction
PatternFlatSelectFunction
class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
@Override
public OUT select(Map<String, List<IN>> pattern) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
return new OUT(startEvent, endEvent);
}
}
对于PatternSelectFunction需要实现select接口,
参数是Map<String, List<IN>> pattern,这是一个匹配成功的pattern sequence,key是pattern名,后面是list是因为对于looping可能有多个匹配值
而对于PatternFlatSelectFunction
,只是在接口上多了Collector,这样可以输出多个值
class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
@Override
public void select(Map<String, List<IN>> pattern, Collector<OUT> collector) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0); for (int i = 0; i < startEvent.getValue(); i++ ) {
collector.collect(new OUT(startEvent, endEvent));
}
}
}
源码
首先是定义pattern,虽然pattern定义比较复杂,但是实现比较简单
最终,
org.apache.flink.cep.nfa.compiler.NFACompiler
会将pattern sequence转化为 NFA,非确定有限状态机,sequence匹配的大部分逻辑都是通过NFA来实现的,就不详细描写了
最终调用到,patternStream.select产生结果流
public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
CEPOperatorUtils.createPatternStream(inputStream, pattern); return patternStream.map(
new PatternSelectMapper<>(
patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
.returns(outTypeInfo);
}
CEPOperatorUtils.createPatternStream
if (inputStream instanceof KeyedStream) {
// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream; TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig()); patternStream = keyedStream.transform(
"KeyedCEPPatternOperator",
(TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
new KeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
keySerializer,
nfaFactory,
true));
} else { KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; patternStream = inputStream.keyBy(keySelector).transform(
"CEPPatternOperator",
(TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
new KeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
keySerializer,
nfaFactory,
false
)).forceNonParallel();
}
关键就是,生成KeyedCEPPatternOperator
public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, List<IN>>>
AbstractKeyedCEPPatternOperator
最关键的就是当一个StreamRecord过来时,我们怎么处理他
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
if (isProcessingTime) {
// there can be no out of order elements in processing time
NFA<IN> nfa = getNFA();
processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
updateNFA(nfa); } else { long timestamp = element.getTimestamp();
IN value = element.getValue(); // In event-time processing we assume correctness of the watermark.
// Events with timestamp smaller than the last seen watermark are considered late.
// Late events are put in a dedicated side output, if the user has specified one. if (timestamp >= lastWatermark) { //只处理非late record // we have an event with a valid timestamp, so
// we buffer it until we receive the proper watermark. saveRegisterWatermarkTimer(); List<IN> elementsForTimestamp = elementQueueState.get(timestamp);
if (elementsForTimestamp == null) {
elementsForTimestamp = new ArrayList<>();
} if (getExecutionConfig().isObjectReuseEnabled()) {
// copy the StreamRecord so that it cannot be changed
elementsForTimestamp.add(inputSerializer.copy(value));
} else {
elementsForTimestamp.add(element.getValue());
}
elementQueueState.put(timestamp, elementsForTimestamp);
}
}
}
可以看到,如果是isProcessingTime,非常简单,直接丢给NFA处理就好
但如果是eventTime,问题就复杂了,因为要解决乱序问题,不能直接交给NFA处理
需要做cache,所以看看elementQueueState
private transient MapState<Long, List<IN>> elementQueueState;
elementQueueState = getRuntimeContext().getMapState(
new MapStateDescriptor<>(
EVENT_QUEUE_STATE_NAME,
LongSerializer.INSTANCE,
new ListSerializer<>(inputSerializer)
)
);
elementQueueState,记录所有时间点上的record list
onEventTime中会触发对elementQueueState上数据的处理,
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception { // 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in event time order by feeding them in the NFA
// 3) advance the time to the current watermark, so that expired patterns are discarded.
// 4) update the stored state for the key, by only storing the new NFA and priority queue iff they
// have state to be used later.
// 5) update the last seen watermark. // STEP 1
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps(); // 把elementQueueState的key按时间排序,PriorityQueue就是堆排序
NFA<IN> nfa = getNFA(); // STEP 2
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) { // peek从小的时间取起,如果小于currentWatermark,就触发
long timestamp = sortedTimestamps.poll();
for (IN element: elementQueueState.get(timestamp)) { // 把该时间对应的record list拿出来处理
processEvent(nfa, element, timestamp);
}
elementQueueState.remove(timestamp);
} // STEP 3
advanceTime(nfa, timerService.currentWatermark()); // STEP 4
if (sortedTimestamps.isEmpty()) {
elementQueueState.clear();
}
updateNFA(nfa); if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
saveRegisterWatermarkTimer();
} // STEP 5
updateLastSeenWatermark(timerService.currentWatermark()); // 更新lastWatermark
}
onEventTime在何时被调用,
AbstractStreamOperator中有个
InternalTimeServiceManager timeServiceManager
来管理所有的time service
在AbstractKeyedCEPPatternOperator中open的时候会,会创建这个time service,并把AbstractKeyedCEPPatternOperator作为triggerTarget传入
timerService = getInternalTimerService(
"watermark-callbacks",
VoidNamespaceSerializer.INSTANCE,
this);
在processElement会调用
saveRegisterWatermarkTimer();
long currentWatermark = timerService.currentWatermark();
// protect against overflow
if (currentWatermark + 1 > currentWatermark) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
}
这个逻辑看起来非常tricky,其实就是往timeService你们注册currentWatermark + 1的timer
AbstractStreamOperator中,当收到watermark的时候,
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
output.emitWatermark(mark);
}
timeServiceManager.advanceWatermark其实就是调用其中每一个time service的advanceWatermark
当前time service的实现,只有HeapInternalTimerService
HeapInternalTimerService.advanceWatermark
public void advanceWatermark(long time) throws Exception {
currentWatermark = time; // 更新currentWatermark InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { // 从eventTimeTimersQueue取出一个timer,判断如果小于当前的watermark,记得我们注册过一个上个watermark+1的timer Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
timerSet.remove(timer);
eventTimeTimersQueue.remove(); keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer); // 调用到onEventTime
}
}
这里还有个需要注意的点,对于KeyedStream,怎么保证不同key独立detect pattern sequence?
对于keyed state,elementQueueState,本身就是按key独立的,所以天然就支持