Flink流处理API
flink API主要分为四个模块:environment, source,transform,sink
environment
getExecutionEnvironment
会根据执行的方式底层调用下面两种方式创建对应的环境:
createLocalEnvironment和createRemoteEnvironment,分别为创建本地和远程的执行环境
source
source分为四种:从集合读取数据,从文件读取数据,从kafka中消费数据,自定义source
从集合中读取
//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//1.自定义集合
val stream1 = env.fromCollection(List(
SensorReading("1", 1234567897, 35.1),
SensorReading("2", 1234567898, 36.1),
SensorReading("3", 1234567899, 37.1),
SensorReading("4", 1234567890, 38.1)
))
stream1.print("stream").setParallelism(1)
env.execute("source test")
}
}
从文件中读取
//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.从文件中读取
val stream2 = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt");
stream2.print("stream").setParallelism(1)
env.execute("source test")
}
}
从元素中读取
//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//3.从元素中读
env.fromElements(1, 2.0, "string").print()
env.execute("source test")
}
}
从kafka中消费
//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 前面的都为有界流,有界流即可以看做批处理,生成环境基本还是流数据
// 4.从kafka中读取数据
val consumer = new FlinkKafkaConsumer[String]("qml", new SimpleStringSchema(), KafkaProperties.getProperties())
val stream3 = env.addSource(consumer)
stream3.print("stream").setParallelism(1)
env.execute("source test")
}
}
object KafkaProperties {
def getProperties() = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.0.80:9092")
properties.setProperty("group.id", "consumer-qml")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "1048576")
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5242880")
properties.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000")
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
properties
}
}
自定义source
//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//5.自定义source
val stream5 = env.addSource(new SensorSource)
stream5.print("stream").setParallelism(1)
env.execute("source test")
}
}
class SensorSource() extends SourceFunction[SensorReading] {
var running: Boolean = true
//生成数据
override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
val rand = new Random()
//初始化数据
var curdata = 1.to(10).map(
i => ("sensor" + i, 60 + rand.nextGaussian() * 10)
)
//不断发送数据
while (running) {
//在上一次数据上做改变
curdata = curdata.map(
t => (t._1, t._2 + rand.nextGaussian())
)
val curTime = System.currentTimeMillis()
curdata.foreach(
t => sourceContext.collect(SensorReading(t._1, curTime, t._2))
)
Thread.sleep(1000)
}
}
//取消数据生成
override def cancel(): Unit = {
running = false
}
}