package UDFTestScalar
import ceshi.SensorReading
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.functions._
import org.apache.flink.types.Row
object ScalarFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env, blinkStreamSettings)
val filePath = "C:\\代码\\myflink\\src\\main\\resources\\sensor"
/*
aa 10 1547718199
aa 11 1547718200
*/
val inputStream = env.readTextFile(filePath)
val dataStream = inputStream.map(date => {
val arr = date.split(" ")
SensorReading(arr(0), arr(1).toDouble, arr(2).toLong)
})
// 这里已经提前定义好水位线了 最大接受延迟1s的数据
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = {
element.shiJianChuo * 1000L // 毫秒数
}
})
// 指定某个字段为事件时间
val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'wendu, 'shiJianChuo.rowtime() as 'ts)
// 调用自定义hash函数 对id进行hash运算
// 1 table api
// val hashCode1 = new HashCode(23)
// val resultTable1 = sensorTable
// .select('id, 'ts, hashCode1('id))
// resultTable1.toAppendStream[Row].print("table api")
// 2 sql
val hashCode1 = new HashCode(23)
tableEnv.createTemporaryView("sensor",sensorTable)
tableEnv.registerFunction("hashCode1",hashCode1)
val resultTable2 = tableEnv.sqlQuery(
"""
|select
|id,
|ts,
|hashCode1(id)
|from sensor
|""".stripMargin
)
resultTable2.toAppendStream[Row].print("sql api")
env.execute("table execute")
}
}
// 自定义标量函数 factor:随机数因子 s:输入元素
class HashCode(factor:Int) extends ScalarFunction{
def eval(s:String): Int ={
s.hashCode * factor - 10000
}
}