flink 流 UDF

package UDFTestScalar

import ceshi.SensorReading
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.functions._
import org.apache.flink.types.Row

object ScalarFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val blinkStreamSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val tableEnv = StreamTableEnvironment.create(env, blinkStreamSettings)


    val filePath = "C:\\代码\\myflink\\src\\main\\resources\\sensor"
    /*
    aa 10 1547718199
    aa 11 1547718200
     */

    val inputStream = env.readTextFile(filePath)
    val dataStream = inputStream.map(date => {
      val arr = date.split(" ")
      SensorReading(arr(0), arr(1).toDouble, arr(2).toLong)
    })
      // 这里已经提前定义好水位线了 最大接受延迟1s的数据
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = {
          element.shiJianChuo * 1000L  // 毫秒数
        }
      })
    // 指定某个字段为事件时间
    val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'wendu, 'shiJianChuo.rowtime() as 'ts)

    // 调用自定义hash函数 对id进行hash运算
    // 1 table api
//    val hashCode1 = new HashCode(23)
//    val resultTable1 = sensorTable
//      .select('id, 'ts, hashCode1('id))
//    resultTable1.toAppendStream[Row].print("table api")

    // 2 sql
        val hashCode1 = new HashCode(23)
    tableEnv.createTemporaryView("sensor",sensorTable)
    tableEnv.registerFunction("hashCode1",hashCode1)
    val resultTable2 = tableEnv.sqlQuery(
      """
        |select
        |id,
        |ts,
        |hashCode1(id)
        |from sensor
        |""".stripMargin
    )
    resultTable2.toAppendStream[Row].print("sql api")


    env.execute("table execute")
  }

}

// 自定义标量函数 factor:随机数因子  s:输入元素
class HashCode(factor:Int) extends ScalarFunction{
  def eval(s:String): Int ={
    s.hashCode * factor - 10000
  }
}

 

上一篇:UDF提权


下一篇:独孤九剑-Spark面试80连击(下)