Flink——四种读取数据方式(集合、文件、kafka、自定义数据源)

Flink分别从集合、文件、kafka和自定义数据源四种方式中读取数据。代码如下:

package apiTest

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import java.util.Properties
import scala.util.Random

//定义样例类,温度传感器
case class SensorReading(id:String, timestamp: Long, temperature:Double)

object SourceTest {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //1. 从集合中读取数据
    val dataList = List(
      SensorReading("sensor_1",1234567898,35.8),
      SensorReading("sensor_2",1223567898,36.8),
      SensorReading("sensor_3",1234345898,33.8),
      SensorReading("sensor_4",1234567458,32.8),
      SensorReading("sensor_5",1256454898,39.8),
    )
    val stream1 = env.fromCollection(dataList)
    stream1.print()

    //2. 从文件中读取数据
    val inputPath = "D:\\IdeaProjects\\flink_tutorial\\src\\main\\resources\\hello.txt"
    val stream2 = env.readTextFile(inputPath)
    stream2.print()

    //3. 从kafka中读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","localhost:9092")
    properties.setProperty("group.id","consumer-group")
    val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(),properties))

    //4. 自定义Sourse
    val stream4 = env.addSource(new MySensorSource())
    stream4.print()
    //执行
    env.execute("source test")
  }
}

// 自定义SourceFunction
class MySensorSource() extends SourceFunction[SensorReading]{
  // 定义一个标识位flag,用来表示数据源是否正常发出数据
  var running: Boolean = true

  override def cancel(): Unit = running = false

  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    //定义一个随机数发生器
    val rand = new Random()

    //随即生成一组(10个)传感器的初始温度(id,temp)
    var curTemp = 1.to(10).map(i=>("sensor_"+i,rand.nextDouble()*100))

    //定义无限循环,不停的产生数据,除非被cancel
    while(running){
    //在上次数据基础上微调,更新温度值
      curTemp = curTemp.map(
        data => (data._1, data._2+rand.nextGaussian())
      )
      //获取当前时间戳,加入到数据中,调用ctx.collect发送数据
      val curTime = System.currentTimeMillis()
      curTemp.foreach(data => ctx.collect(SensorReading(data._1,curTime,data._2)))
      //间隔100ms
      Thread.sleep(100)
    }
  }

}
上一篇:Mtk Camera Hal到驱动的流程(3)


下一篇:电赛总结 + openmv4 实现数字识别