flink 读取 CSV 文件,并将 DataStream 转 Table 对象

package com.myflink

import java.lang.reflect.Field
import java.util

import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.java.io.{PojoCsvInputFormat, RowCsvInputFormat}
import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo}
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Tumble
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

object Main {

  def main(args: Array[String]): Unit = {
    import org.apache.flink.streaming.api.scala._

    val env = StreamExecutionEnvironment.getExecutionEnvironment;
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    val senSor = SenSor("a", 0L, 0.2);
    val lst = new util.ArrayList[PojoField]();
    val arrFields: Array[Field] = senSor.getClass.getDeclaredFields;
    for (field <- arrFields) {
      lst.add(new PojoField(field, TypeInformation.of(field.getType)));
    }

    val path = new Path("D:\\allspace\\flink0906\\src\\main\\resources\\input\\test.csv");
    val ds: DataStream[SenSor] = env
      .createInput(new PojoCsvInputFormat(path, new PojoTypeInfo(classOf[SenSor], lst)))
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SenSor](Time.seconds(10)) {
        override def extractTimestamp(element: SenSor): Long = element.timestamp * 1000
      });

    val tableEnv = StreamTableEnvironment.create(env);
    val table = tableEnv.fromDataStream(ds, 'id, 'timestamp.rowtime, 'uempearture);
    table.window(Tumble over 10.seconds on 'timestamp as 'tw)
      .groupBy('id, 'tw)
      .select('id, 'id.count, 'uempearture.avg, 'tw.end)
      .toRetractStream[Row].print()


    env.execute();
  }

}

case class SenSor(var id: String, var timestamp: Long, var uempearture: Double) {};

  这里面坑:

SenSor 对象的属性,在构造 PojoTypeInfo 时按照名字重排序,这直接造成对 csv 文件解析出错。

csv文件内容:
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4

flink版本为 1.10.1




上一篇:Flink实例(四十二): Operators(三)FILTER


下一篇:flink_根据官网搭建项目_DataStream API