Flink分别从集合、文件、kafka和自定义数据源四种方式中读取数据。代码如下:
package apiTest
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import java.util.Properties
import scala.util.Random
//定义样例类,温度传感器
case class SensorReading(id:String, timestamp: Long, temperature:Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//1. 从集合中读取数据
val dataList = List(
SensorReading("sensor_1",1234567898,35.8),
SensorReading("sensor_2",1223567898,36.8),
SensorReading("sensor_3",1234345898,33.8),
SensorReading("sensor_4",1234567458,32.8),
SensorReading("sensor_5",1256454898,39.8),
)
val stream1 = env.fromCollection(dataList)
stream1.print()
//2. 从文件中读取数据
val inputPath = "D:\\IdeaProjects\\flink_tutorial\\src\\main\\resources\\hello.txt"
val stream2 = env.readTextFile(inputPath)
stream2.print()
//3. 从kafka中读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers","localhost:9092")
properties.setProperty("group.id","consumer-group")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(),properties))
//4. 自定义Sourse
val stream4 = env.addSource(new MySensorSource())
stream4.print()
//执行
env.execute("source test")
}
}
// 自定义SourceFunction
class MySensorSource() extends SourceFunction[SensorReading]{
// 定义一个标识位flag,用来表示数据源是否正常发出数据
var running: Boolean = true
override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
//定义一个随机数发生器
val rand = new Random()
//随即生成一组(10个)传感器的初始温度(id,temp)
var curTemp = 1.to(10).map(i=>("sensor_"+i,rand.nextDouble()*100))
//定义无限循环,不停的产生数据,除非被cancel
while(running){
//在上次数据基础上微调,更新温度值
curTemp = curTemp.map(
data => (data._1, data._2+rand.nextGaussian())
)
//获取当前时间戳,加入到数据中,调用ctx.collect发送数据
val curTime = System.currentTimeMillis()
curTemp.foreach(data => ctx.collect(SensorReading(data._1,curTime,data._2)))
//间隔100ms
Thread.sleep(100)
}
}
}