Flink CEP

本周头疼少了一更,下周补上

FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。 它允许你在×××的事件流中检测事件模式,让你有机会掌握数据中重要的事项。

本文描述了Flink CEP中可用的API调用。 首先介绍Pattern API,它允许你指定要在流中检测的模式,然后介绍如何检测匹配事件序列并对其进行操作。
然后,我们将介绍CEP库在处理事件时间延迟时所做的假设。

首先是要在你的pom.xml文件中,引入CEP库。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.11</artifactId>
  <version>1.5.0</version>
</dependency>

注意要应用模式匹配的DataStream中的事件必须实现正确的equals()和hashCode()方法,因为FlinkCEP使用它们来比较和匹配事件。

package com.thoughtworks.loginfail

import java.util

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

// 登录样例类
case class LoginEvent(userId: Long, ip: String, eventType: String, eventTime: Long)

// 输出报警信息样例类
case class Warning(userId: Long, firstFailTime: Long, lastFailTime: Long, warningMSG: String)

object LoginFailWithCEP {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 自定义测试数据
    val loginStream = env.fromCollection(List(
      LoginEvent(1, "192.168.0.1", "fail", 1558430842),
      LoginEvent(1, "192.168.0.2", "success", 1558430843),
      LoginEvent(1, "192.168.0.3", "fail", 1558430844),
      LoginEvent(1, "192.168.0.3", "fail", 1558430847),
      LoginEvent(1, "192.168.0.3", "fail", 1558430848),
      LoginEvent(2, "192.168.10.10", "success", 1558430850)
    ))
      .assignAscendingTimestamps(_.eventTime * 1000)

    // 定义pattern,对事件流进行模式匹配
    val loginFailPattern = Pattern.begin[LoginEvent]("begin")
      .where(_.eventType == "fail")
      .next("next")
      .where(_.eventType == "fail")
      .within(Time.seconds(2))

    // 在输入流的基础上应用pattern,得到匹配的pattern stream
    val patternStream = CEP.pattern(loginStream.keyBy(_.userId), loginFailPattern)

    // 用select方法从pattern stream中提取输出数据流
    //    import scala.collection.Map
    //    val loginFailDataStream : DataStream[Warning] = patternStream.select( ( patternEvents: Map[String, Iterable[LoginEvent]] ) => {
    //      // 从Map里取出对应的登录失败事件,然后包装成warning
    //      val firstFailEvent = patternEvents.getOrElse("begin", null).iterator.next()
    //      val secondFailEvent = patternEvents.getOrElse("next", null).iterator.next()
    //      Warning( firstFailEvent.userId, firstFailEvent.eventTime, secondFailEvent.eventTime, "login fail waring" )
    //    } )
    
    val loginFailDataStream = patternStream.select(new MySelectFuction())

    // 将得到的警告信息流输出sink
    loginFailDataStream.print("warning")

    env.execute("Login Fail Detect with CEP")
  }
}

class MySelectFuction() extends PatternSelectFunction[LoginEvent, Warning] {
  override def select(patternEvents: util.Map[String, util.List[LoginEvent]]): Warning = {
    val firstFailEvent = patternEvents.getOrDefault("begin", null).iterator.next()
    val secondFailEvent = patternEvents.getOrDefault("next", null).iterator.next()
    Warning(firstFailEvent.userId, firstFailEvent.eventTime, secondFailEvent.eventTime, "login fail waring")
  }
}
上一篇:jsp页面的form表单验证用户名和密码不能为空!


下一篇:Flink_恶意登录监控 (利用CEP)