大数据之输出报警信息

package com.sjw.flink

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
* KeyProcessFunction
*/
object ProcessFunctionTest {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream: DataStream[String] = env.socketTextStream("sunjunwei1.com",6666)

val dataStream: DataStream[SensorReading] = stream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})

val precessStream: DataStream[String] = dataStream.keyBy(_.id)
.process(new TempIncreAlert())

dataStream.print("input")
precessStream.print("precess")


env.execute()
}

}

class TempIncreAlert() extends KeyedProcessFunction[String,SensorReading,String]{

//定义一个状态,用来保存上一个数据的温度 lazy懒加载 运行时生效
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",classOf[Double]))

//定义一个状态 用来保存定时器的时间戳
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentTimer",classOf[Long]))

override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit ={

//先取出上一个温度
val preTemp: Double = lastTemp.value()
//更新温度值
lastTemp.update(i.temperature)
//获取当前时间
val curTimerTs: Long = currentTimer.value()

if(i.temperature > preTemp && curTimerTs == 0){
//获取时间戳
val timer: Long = context.timerService().currentProcessingTime() + 5000L
//注册定时器
context.timerService().registerProcessingTimeTimer(timer)
//更新定时器
currentTimer.update(timer)
}else if(i.temperature < preTemp || preTemp == 0.0){
//删除定时器
context.timerService().deleteProcessingTimeTimer(curTimerTs)
//清空状态
currentTimer.clear()
}

}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {

//输出报警信息 当前key
out.collect(ctx.getCurrentKey + "温度连续上升")
//清理状态
currentTimer.clear()
}
}

上一篇:Flink之Watermarks


下一篇:Flink学习(十七) Emitting to Side Outputs(侧输出)