看看Flink cep如何将pattern转换为NFA?
当来了一条event,如果在NFA中执行的?
前面的链路,CEP –> PatternStream –> select –> CEPOperatorUtils.createPatternStream
1. 产生NFACompiler.compileFactory,完成pattern到state的转换
final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false);
final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
nfaFactoryCompiler.compileFactory();
return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
调用,nfaFactoryCompiler.compileFactory
void compileFactory() {
// we're traversing the pattern from the end to the beginning --> the first state is the final state
State<T> sinkState = createEndingState();
// add all the normal states
sinkState = createMiddleStates(sinkState);
// add the beginning state
createStartState(sinkState);
}
可以看到做的工作,主要是生成state,即把pattern转换为NFA中的state和stateTransition
因为加pattern的是不断往后加,通过private final Pattern<T, ? extends T> previous来指向前面的pattern,所以在遍历pattern的时候只能回溯
先创建最后的final state
private State<T> createEndingState() {
State<T> endState = createState(ENDING_STATE_NAME, State.StateType.Final);
windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
return endState;
}
很简单,就单纯的创建state
private State<T> createState(String name, State.StateType stateType) {
String stateName = getUniqueInternalStateName(name);
usedNames.add(stateName);
State<T> state = new State<>(stateName, stateType);
states.add(state);
return state;
}
继续加middle的state,
private State<T> createMiddleStates(final State<T> sinkState) {
State<T> lastSink = sinkState; //记录上一个state
while (currentPattern.getPrevious() != null) { checkPatternNameUniqueness(currentPattern.getName());
lastSink = convertPattern(lastSink); //convert pattern到state // we traverse the pattern graph backwards
followingPattern = currentPattern;
currentPattern = currentPattern.getPrevious(); //往前回溯 final Time currentWindowTime = currentPattern.getWindowTime();
if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
// the window time is the global minimum of all window times of each state
windowTime = currentWindowTime.toMilliseconds();
}
}
return lastSink;
}
调用convertPattern,
private State<T> convertPattern(final State<T> sinkState) {
final State<T> lastSink; lastSink = createSingletonState(sinkState); //只看singleton state
addStopStates(lastSink); return lastSink;
}
createSingletonState
private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition(); //从pattern里面取出condition
final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal); //对currentPattern创建singletonState
// if event is accepted then all notPatterns previous to the optional states are no longer valid
singletonState.addTake(sink, currentCondition); //设置take StateTransition if (isOptional) {
// if no element accepted the previous nots are still valid.
singletonState.addProceed(sinkState, trueFunction); //如果有Optional,设置Proceed StateTransition
} return singletonState;
}
addTake
addStateTransition
public void addStateTransition(
final StateTransitionAction action,
final State<T> targetState,
final IterativeCondition<T> condition) {
stateTransitions.add(new StateTransition<T>(this, action, targetState, condition));
}
createStartState
private State<T> createStartState(State<T> sinkState) {
checkPatternNameUniqueness(currentPattern.getName());
final State<T> beginningState = convertPattern(sinkState);
beginningState.makeStart();
return beginningState;
}
2. 当event coming,如何处理?
AbstractKeyedCEPPatternOperator.processElement
NFA<IN> nfa = getNFA();
processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
updateNFA(nfa);
如果statebackend里面有就取出来,否则nfaFactory.createNFA
private NFA<IN> getNFA() throws IOException {
NFA<IN> nfa = nfaOperatorState.value();
return nfa != null ? nfa : nfaFactory.createNFA();
}
createNFA
NFA<T> result = new NFA<>(inputTypeSerializer.duplicate(), windowTime, timeoutHandling);
result.addStates(states);
addState
public void addStates(final Collection<State<T>> newStates) {
for (State<T> state: newStates) {
addState(state);
}
} public void addState(final State<T> state) {
states.add(state); if (state.isStart()) {
computationStates.add(ComputationState.createStartState(this, state));
}
}
把states加入到NFA,
start state会加入computationStates,因为pattern的识别总是从start开始
KeyedCEPPatternOperator – > processEvent
protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
nfa.process(event, timestamp); emitMatchedSequences(patterns.f0, timestamp);
}
NFA –> process
public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> process(final T event, final long timestamp) {
final int numberComputationStates = computationStates.size();
final Collection<Map<String, List<T>>> result = new ArrayList<>();
final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>(); // iterate over all current computations
for (int i = 0; i < numberComputationStates; i++) { //遍历所有的当前state
ComputationState<T> computationState = computationStates.poll(); //poll一个state final Collection<ComputationState<T>> newComputationStates; newComputationStates = computeNextStates(computationState, event, timestamp); //通过NFA计算下一批的state //delay adding new computation states in case a stop state is reached and we discard the path.
final Collection<ComputationState<T>> statesToRetain = new ArrayList<>(); //newComputationStates中有可能是stop state,所以不一定会放到statesToRetain
//if stop state reached in this path
boolean shouldDiscardPath = false;
for (final ComputationState<T> newComputationState: newComputationStates) {
if (newComputationState.isFinalState()) { //如果是final state,说明完成匹配
// we've reached a final state and can thus retrieve the matching event sequence
Map<String, List<T>> matchedPattern = extractCurrentMatches(newComputationState);
result.add(matchedPattern); // remove found patterns because they are no longer needed
eventSharedBuffer.release(
newComputationState.getPreviousState().getName(),
newComputationState.getEvent(),
newComputationState.getTimestamp(),
computationState.getCounter());
} else if (newComputationState.isStopState()) { //如果是stop state,那么删除该path
//reached stop state. release entry for the stop state
shouldDiscardPath = true;
eventSharedBuffer.release(
newComputationState.getPreviousState().getName(),
newComputationState.getEvent(),
newComputationState.getTimestamp(),
computationState.getCounter());
} else { //中间状态,放入statesToRetain
// add new computation state; it will be processed once the next event arrives
statesToRetain.add(newComputationState);
}
} if (shouldDiscardPath) { //释放discardPath
// a stop state was reached in this branch. release branch which results in removing previous event from
// the buffer
for (final ComputationState<T> state : statesToRetain) {
eventSharedBuffer.release(
state.getPreviousState().getName(),
state.getEvent(),
state.getTimestamp(),
state.getCounter());
}
} else { //将中间state加入computationStates
computationStates.addAll(statesToRetain);
} } // prune shared buffer based on window length
if (windowTime > 0L) { //prune超时过期的pattern
long pruningTimestamp = timestamp - windowTime; if (pruningTimestamp < timestamp) {
// the check is to guard against underflows // remove all elements which are expired
// with respect to the window length
eventSharedBuffer.prune(pruningTimestamp);
}
} return Tuple2.of(result, timeoutResult);
}
computeNextStates
private Collection<ComputationState<T>> computeNextStates(
final ComputationState<T> computationState,
final T event,
final long timestamp) { final OutgoingEdges<T> outgoingEdges = createDecisionGraph(computationState, event); //找出state的所有出边 final List<StateTransition<T>> edges = outgoingEdges.getEdges(); final List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
for (StateTransition<T> edge : edges) {
switch (edge.getAction()) {
case IGNORE: {
if (!computationState.isStartState()) {
final DeweyNumber version;
if (isEquivalentState(edge.getTargetState(), computationState.getState())) {
//Stay in the same state (it can be either looping one or singleton)
final int toIncrease = calculateIncreasingSelfState(
outgoingEdges.getTotalIgnoreBranches(),
outgoingEdges.getTotalTakeBranches());
version = computationState.getVersion().increase(toIncrease);
} else {
//IGNORE after PROCEED
version = computationState.getVersion()
.increase(totalTakeToSkip + ignoreBranchesToVisit)
.addStage();
ignoreBranchesToVisit--;
} addComputationState( //对于ignore state,本身不用take,把target state加到computation state中
resultingComputationStates,
edge.getTargetState(),
computationState.getPreviousState(),
computationState.getEvent(),
computationState.getCounter(),
computationState.getTimestamp(),
version,
computationState.getStartTimestamp()
);
}
}
break;
case TAKE:
final State<T> nextState = edge.getTargetState();
final State<T> currentState = edge.getSourceState();
final State<T> previousState = computationState.getPreviousState(); final T previousEvent = computationState.getEvent(); final int counter;
final long startTimestamp;
//对于take,需要把当前state记录到path里面,即放到eventSharedBuffer
if (computationState.isStartState()) {
startTimestamp = timestamp;
counter = eventSharedBuffer.put(
currentState.getName(),
event,
timestamp,
currentVersion);
} else {
startTimestamp = computationState.getStartTimestamp();
counter = eventSharedBuffer.put(
currentState.getName(),
event,
timestamp,
previousState.getName(),
previousEvent,
computationState.getTimestamp(),
computationState.getCounter(),
currentVersion);
} addComputationState(
resultingComputationStates,
nextState,
currentState,
event,
counter,
timestamp,
nextVersion,
startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state)
final State<T> finalState = findFinalStateAfterProceed(nextState, event, computationState);
if (finalState != null) {
addComputationState(
resultingComputationStates,
finalState,
currentState,
event,
counter,
timestamp,
nextVersion,
startTimestamp);
}
break;
}
} return resultingComputationStates;
}
private OutgoingEdges<T> createDecisionGraph(ComputationState<T> computationState, T event) {
final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(computationState.getState()); final Stack<State<T>> states = new Stack<>();
states.push(computationState.getState()); //First create all outgoing edges, so to be able to reason about the Dewey version
while (!states.isEmpty()) {
State<T> currentState = states.pop();
Collection<StateTransition<T>> stateTransitions = currentState.getStateTransitions(); //取出state所有的stateTransitions // check all state transitions for each state
for (StateTransition<T> stateTransition : stateTransitions) {
try {
if (checkFilterCondition(computationState, stateTransition.getCondition(), event)) {
// filter condition is true
switch (stateTransition.getAction()) {
case PROCEED: //如果是proceed,直接跳到下个state
// simply advance the computation state, but apply the current event to it
// PROCEED is equivalent to an epsilon transition
states.push(stateTransition.getTargetState());
break;
case IGNORE:
case TAKE: //default,把stateTransition加入边
outgoingEdges.add(stateTransition);
break;
}
}
} catch (Exception e) {
throw new RuntimeException("Failure happened in filter function.", e);
}
}
}
return outgoingEdges;
}