1 工程目录
SensorReading
package com.atguigu.flink.bean case class SensorReading( id: String, timestamp: Long, timepreture: Double )
SensorSource
package com.atguigu.flink.source import java.util.Calendar import com.atguigu.flink.bean.SensorReading import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, RichSourceFunction, SourceFunction} import scala.collection.immutable import scala.util.Random class SensorSource extends RichSourceFunction[SensorReading]{ //表示数据源是否运行正常 var running: Boolean = true //上下文参数来发送数据 override def run(sContext:SourceFunction.SourceContext[SensorReading]) { val rand = new Random() //使用高斯噪声产生随机温度 val curFtemp = (1 to 10).map( i => ("sensor_" + i, rand.nextGaussian() * 20) ) //产生无限流数据 while(running){ val mapTemp:immutable.IndexedSeq[(String,Double)] = curFtemp.map( t => (t._1,t._2 + (rand.nextGaussian()*10)) ) //产生时间戳 val curTime:Long = Calendar.getInstance().getTimeInMillis //发送出去 mapTemp.foreach(t => sContext.collect(SensorReading(t._1,curTime,t._2))) //每隔100ms发送一条传感器数据 Thread.sleep(100) } } override def cancel(): Unit = running =false }
主程序入口 ConsumerFromSensorSource
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.source.SensorSource import org.apache.flink.streaming.api.datastream.DataStreamSource import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment object ConsumerFromSensorSource { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //调用addSource val stream: DataStreamSource[SensorReading] = env.addSource(new SensorSource) // 打印流 stream.print() // 执行主程序 env.execute() } }