Flink流处理API之environment And Source

Flink流处理API

flink API主要分为四个模块:environment, source,transform,sink

environment

getExecutionEnvironment

会根据执行的方式底层调用下面两种方式创建对应的环境:

createLocalEnvironment和createRemoteEnvironment,分别为创建本地和远程的执行环境

source

source分为四种:从集合读取数据,从文件读取数据,从kafka中消费数据,自定义source

从集合中读取

//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object SourceTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //1.自定义集合
    val stream1 = env.fromCollection(List(
      SensorReading("1", 1234567897, 35.1),
      SensorReading("2", 1234567898, 36.1),
      SensorReading("3", 1234567899, 37.1),
      SensorReading("4", 1234567890, 38.1)
    ))
    stream1.print("stream").setParallelism(1)
    env.execute("source test")
  }
}
 

从文件中读取

​​​​​​​//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object SourceTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.从文件中读取
    val stream2 = env.readTextFile("E:/qmlidea/flink/src/main/resources/sensor.txt");
    stream2.print("stream").setParallelism(1)
    env.execute("source test")

  }
}
 

从元素中读取

​​​​​​​//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)


object SourceTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //3.从元素中读
    env.fromElements(1, 2.0, "string").print()
    env.execute("source test")
  }
}
 

从kafka中消费

​​​​​​​//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)


object SourceTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    前面的都为有界流,有界流即可以看做批处理,生成环境基本还是流数据
    //    4.从kafka中读取数据
    val consumer = new FlinkKafkaConsumer[String]("qml", new SimpleStringSchema(), KafkaProperties.getProperties())
    val stream3 = env.addSource(consumer)
    stream3.print("stream").setParallelism(1)
    env.execute("source test")

  }
}
object KafkaProperties {

  def getProperties() = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.0.80:9092")
    properties.setProperty("group.id", "consumer-qml")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "1048576")
    properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5242880")
    properties.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    properties
  }
}

自定义source

//样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)


object SourceTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //5.自定义source
    val stream5 = env.addSource(new SensorSource)
    stream5.print("stream").setParallelism(1)
    env.execute("source test")
  }
}

class SensorSource() extends SourceFunction[SensorReading] {
  var running: Boolean = true

  //生成数据
  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    val rand = new Random()
    //初始化数据
    var curdata = 1.to(10).map(
      i => ("sensor" + i, 60 + rand.nextGaussian() * 10)
    )
    //不断发送数据
    while (running) {

      //在上一次数据上做改变
      curdata = curdata.map(
        t => (t._1, t._2 + rand.nextGaussian())
      )

      val curTime = System.currentTimeMillis()
      curdata.foreach(
        t => sourceContext.collect(SensorReading(t._1, curTime, t._2))
      )
      Thread.sleep(1000)
    }
  }

  //取消数据生成
  override def cancel(): Unit = {
    running = false
  }
}
 
上一篇:Pycharm New environment和Existing environment


下一篇:node sass does not yet support your current environment