代码:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
//温度传感器读取样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//1.从自定义的集合中读取数据
import org.apache.flink.api.scala._
// val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),
// SensorReading("sensor_6", 1547718201, 15.402984393403084),
// SensorReading("sensor_7", 1547718202, 6.720945201171228),
// SensorReading("sensor_10", 1547718205, 38.101067604893444)
// ))
// stream1.print("stream1").setParallelism(1)
// env.execute("source test")
//
// //2.从文件读取数据
// val stream2 = env.readTextFile("sensor.txt")
// stream2.print("stream2").setParallelism(1)
// env.execute("source test")
//3.以 kafka 消息队列的数据作为来源
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
stream3.print("stream3").setParallelism(1)
env.execute("source test")
}
}
1.启动zookerper
2.启动Kafka
3.启动Producer并发送数据
4.结果