Flink从入门到入土(中)

3.1.2 RichMapFunction


所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction


sensor-data.log 文件数据 同上一致


import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:从文件读取数据
 */
object Transform_RichMapFunction {
  def main(args: Array[String]): Unit = {
    //1.创建执行的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.从指定路径获取数据
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
    val myMapDS: DataStream[WaterSensor] = sensorDS.map(new MyRichMapFunction)
    //3.打印
    myMapDS.print()
    //4.执行
    env.execute("map")
  }
  /**
   * 自定义继承 MapFunction
   * MapFunction[T,O]
   * 自定义输入和输出
   *
   */
  class MyRichMapFunction extends RichMapFunction[String,WaterSensor]{
    override def map(value: String): WaterSensor = {
      val datas: Array[String] = value.split(",")
      //      WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      WaterSensor(getRuntimeContext.getTaskName, datas(1).toLong, datas(2).toInt)
    }
    // 富函数提供了生命周期方法
    override def open(parameters: Configuration): Unit = {}
    override def close(): Unit = {}
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


Rich Function有一个生命周期的概念。典型的生命周期方法有:


  • open()方法是rich function的初始化方法,当一个算子例如map或者filter被调 用之前open()会被调用


  • close()方法是生命周期中的最后一个调用的方法,做一些清理工作


  • getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行         的并行度,任务的名字,以及state状态


3.1.3 flatMap


  • 扁平映射:将数据流中的整体拆分成一个一个的个体使用,消费一个元素并产生零到多个元素


  • 参数:Scala匿名函数或FlatMapFunction


  • 返回:DataStream


Flink从入门到入土(中)


import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:FlatMap
 */
object Transform_FlatMap {

  def main(args: Array[String]): Unit = {

    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.读取数据
    val listDS: DataStream[List[Int]] = env.fromCollection(
      List(
        List(1, 2, 3, 4),
        List(5, 6, 7,1,1,1)
      )
    )

    val resultDS: DataStream[Int] = listDS.flatMap(list => list)

    resultDS.print()


    // 4. 执行
    env.execute()
  }


}


Flink从入门到入土(中)


3.2. filter


  • 过滤:根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃
  • 参数:Scala匿名函数或FilterFunction
  • 返回:DataStream


import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:Filter
 */
object Transform_Filter {
  def main(args: Array[String]): Unit = {
    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 2.读取数据
    val listDS: DataStream[List[Int]] = env.fromCollection(
      List(
        List(1, 2, 3, 4,1, 2, 3, 4),
        List(5, 6, 7,1,1,1,1, 2, 3, 4,1, 2, 3, 4),
        List(1, 2, 3, 4),
        List(5, 6, 7,1,1,1),
        List(1, 2, 3, 4),
        List(5, 6, 7,1,1,1)
      )
    )
    // true就留下,false就抛弃
    listDS.filter(num => {
      num.size>5
      })
      .print("filter")
    // 4. 执行
    env.execute()
  }
}



Flink从入门到入土(中)


3.3 keyBy


在Spark中有一个GroupBy的算子,用于根据指定的规则将数据进行分组,在flink中也有类似的功能,那就是keyBy,根据指定的key对数据进行分流


  • 分流:根据指定的Key将元素发送到不同的分区,相同的Key会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。keyBy()是通过哈希来分区的


Flink从入门到入土(中)


  • 参数:Scala匿名函数或POJO属性或元组索引,不能使用数组


  • 返回:KeyedStream


Flink从入门到入土(中)


import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:FlatMap
 */
object Transform_KeyBy {

  def main(args: Array[String]): Unit = {

    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.读取数据
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    //3.转换为样例类
    val mapDS = sensorDS.map(
      lines => {
        val datas = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )

    // 4. 使用keyby进行分组
    // TODO 关于返回的key的类型:
    // 1. 如果是位置索引 或 字段名称 ,程序无法推断出key的类型,所以给一个java的Tuple类型
    // 2. 如果是匿名函数 或 函数类 的方式,可以推断出key的类型,比较推荐使用
    // *** 分组的概念:分组只是逻辑上进行分组,打上了记号(标签),跟并行度没有绝对的关系
    //      同一个分组的数据在一起(不离不弃)
    //      同一个分区里可以有多个不同的组

    //        val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy(0)
    //    val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy("id")
    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
    //    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(
    //      new KeySelector[WaterSensor, String] {
    //        override def getKey(value: WaterSensor): String = {
    //          value.id
    //        }
    //      }
    //    )

    sensorKS.print().setParallelism(5)

    // 4. 执行
    env.execute()
  }

  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


Flink从入门到入土(中)


3.4 shuffle


  • 打乱重组(洗牌):将数据按照均匀分布打散到下游
  • 参数:无
  • 返回:DataStream


Flink从入门到入土(中)


import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:FlatMap
 */
object Transform_Shuffle {

  def main(args: Array[String]): Unit = {

    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.读取数据
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    val shuffleDS = sensorDS.shuffle

    sensorDS.print("data")

    shuffleDS.print("shuffle")
    // 4. 执行
    env.execute()
  }
}


Flink从入门到入土(中)


3.5. split


在某些情况下,我们需要将数据流根据某些特征拆分成两个或者多个数据流,给不同数据流增加标记以便于从流中取出。



Flink从入门到入土(中)


需求:将水位传感器数据按照空高高低(以40cm,30cm为界),拆分成三个流


import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:FlatMap
 */
object Transform_Split {
  def main(args: Array[String]): Unit = {
    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 2.读取数据
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
    // 3.转换成样例类
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    val splitSS: SplitStream[WaterSensor] = mapDS.split(
      sensor => {
        if (sensor.vc < 40) {
          Seq("normal")
        } else if (sensor.vc < 80) {
          Seq("Warn")
        } else {
          Seq("alarm")
        }
      }
    )
    // 4. 执行
    env.execute()
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


3.6 select


将数据流进行切分后,如何从流中将不同的标记取出呢,这时就需要使用select算子了。


Flink从入门到入土(中)


import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:FlatMap
 */
object Transform_Split {

  def main(args: Array[String]): Unit = {

    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.读取数据
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.转换成样例类
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    val splitDS: SplitStream[WaterSensor] = mapDS.split(
      sensor => {
        if (sensor.vc < 40) {
          Seq("info")
        } else if (sensor.vc < 80) {
          Seq("warn")
        } else {
          Seq("error")
        }
      }
    )
    val errorDS: DataStream[WaterSensor] = splitDS.select("error")
    val warnDS: DataStream[WaterSensor] = splitDS.select("warn")
    val infoDS: DataStream[WaterSensor] = splitDS.select("info")

    infoDS.print("info")
    warnDS.print("warn")
    errorDS.print("error")

    // 4. 执行
    env.execute()
  }

  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


Flink从入门到入土(中)

上一篇:对Python中一些“坑”的总结及技巧


下一篇:Python中使用class(),面向对象有什么优势