package com.myflink import java.lang.reflect.Field import java.util import org.apache.flink.api.common.typeinfo._ import org.apache.flink.api.java.io.{PojoCsvInputFormat, RowCsvInputFormat} import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo} import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.Tumble import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row object Main { def main(args: Array[String]): Unit = { import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment.getExecutionEnvironment; env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); val senSor = SenSor("a", 0L, 0.2); val lst = new util.ArrayList[PojoField](); val arrFields: Array[Field] = senSor.getClass.getDeclaredFields; for (field <- arrFields) { lst.add(new PojoField(field, TypeInformation.of(field.getType))); } val path = new Path("D:\\allspace\\flink0906\\src\\main\\resources\\input\\test.csv"); val ds: DataStream[SenSor] = env .createInput(new PojoCsvInputFormat(path, new PojoTypeInfo(classOf[SenSor], lst))) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SenSor](Time.seconds(10)) { override def extractTimestamp(element: SenSor): Long = element.timestamp * 1000 }); val tableEnv = StreamTableEnvironment.create(env); val table = tableEnv.fromDataStream(ds, 'id, 'timestamp.rowtime, 'uempearture); table.window(Tumble over 10.seconds on 'timestamp as 'tw) .groupBy('id, 'tw) .select('id, 'id.count, 'uempearture.avg, 'tw.end) .toRetractStream[Row].print() env.execute(); } } case class SenSor(var id: String, var timestamp: Long, var uempearture: Double) {};
这里面坑:
SenSor 对象的属性,在构造 PojoTypeInfo 时按照名字重排序,这直接造成对 csv 文件解析出错。
csv文件内容:
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
flink版本为 1.10.1