package com.sjw.flink
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* KeyProcessFunction
*/
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.socketTextStream("sunjunwei1.com",6666)
val dataStream: DataStream[SensorReading] = stream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
val precessStream: DataStream[String] = dataStream.keyBy(_.id)
.process(new TempIncreAlert())
dataStream.print("input")
precessStream.print("precess")
env.execute()
}
}
class TempIncreAlert() extends KeyedProcessFunction[String,SensorReading,String]{
//定义一个状态,用来保存上一个数据的温度 lazy懒加载 运行时生效
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",classOf[Double]))
//定义一个状态 用来保存定时器的时间戳
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentTimer",classOf[Long]))
override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit ={
//先取出上一个温度
val preTemp: Double = lastTemp.value()
//更新温度值
lastTemp.update(i.temperature)
//获取当前时间
val curTimerTs: Long = currentTimer.value()
if(i.temperature > preTemp && curTimerTs == 0){
//获取时间戳
val timer: Long = context.timerService().currentProcessingTime() + 5000L
//注册定时器
context.timerService().registerProcessingTimeTimer(timer)
//更新定时器
currentTimer.update(timer)
}else if(i.temperature < preTemp || preTemp == 0.0){
//删除定时器
context.timerService().deleteProcessingTimeTimer(curTimerTs)
//清空状态
currentTimer.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
//输出报警信息 当前key
out.collect(ctx.getCurrentKey + "温度连续上升")
//清理状态
currentTimer.clear()
}
}