3.1.2 RichMapFunction
所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction
sensor-data.log 文件数据 同上一致
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:从文件读取数据 */ object Transform_RichMapFunction { def main(args: Array[String]): Unit = { //1.创建执行的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.从指定路径获取数据 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") val myMapDS: DataStream[WaterSensor] = sensorDS.map(new MyRichMapFunction) //3.打印 myMapDS.print() //4.执行 env.execute("map") } /** * 自定义继承 MapFunction * MapFunction[T,O] * 自定义输入和输出 * */ class MyRichMapFunction extends RichMapFunction[String,WaterSensor]{ override def map(value: String): WaterSensor = { val datas: Array[String] = value.split(",") // WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) WaterSensor(getRuntimeContext.getTaskName, datas(1).toLong, datas(2).toInt) } // 富函数提供了生命周期方法 override def open(parameters: Configuration): Unit = {} override def close(): Unit = {} } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
Rich Function有一个生命周期的概念。典型的生命周期方法有:
- open()方法是rich function的初始化方法,当一个算子例如map或者filter被调 用之前open()会被调用
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作
- getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行 的并行度,任务的名字,以及state状态
3.1.3 flatMap
- 扁平映射:将数据流中的整体拆分成一个一个的个体使用,消费一个元素并产生零到多个元素
- 参数:Scala匿名函数或FlatMapFunction
- 返回:DataStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:FlatMap */ object Transform_FlatMap { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.读取数据 val listDS: DataStream[List[Int]] = env.fromCollection( List( List(1, 2, 3, 4), List(5, 6, 7,1,1,1) ) ) val resultDS: DataStream[Int] = listDS.flatMap(list => list) resultDS.print() // 4. 执行 env.execute() } }
3.2. filter
- 过滤:根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃
- 参数:Scala匿名函数或FilterFunction
- 返回:DataStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:Filter */ object Transform_Filter { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.读取数据 val listDS: DataStream[List[Int]] = env.fromCollection( List( List(1, 2, 3, 4,1, 2, 3, 4), List(5, 6, 7,1,1,1,1, 2, 3, 4,1, 2, 3, 4), List(1, 2, 3, 4), List(5, 6, 7,1,1,1), List(1, 2, 3, 4), List(5, 6, 7,1,1,1) ) ) // true就留下,false就抛弃 listDS.filter(num => { num.size>5 }) .print("filter") // 4. 执行 env.execute() } }
3.3 keyBy
在Spark中有一个GroupBy的算子,用于根据指定的规则将数据进行分组,在flink中也有类似的功能,那就是keyBy,根据指定的key对数据进行分流
- 分流:根据指定的Key将元素发送到不同的分区,相同的Key会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。keyBy()是通过哈希来分区的
- 参数:Scala匿名函数或POJO属性或元组索引,不能使用数组
- 返回:KeyedStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:FlatMap */ object Transform_KeyBy { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.读取数据 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") //3.转换为样例类 val mapDS = sensorDS.map( lines => { val datas = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) // 4. 使用keyby进行分组 // TODO 关于返回的key的类型: // 1. 如果是位置索引 或 字段名称 ,程序无法推断出key的类型,所以给一个java的Tuple类型 // 2. 如果是匿名函数 或 函数类 的方式,可以推断出key的类型,比较推荐使用 // *** 分组的概念:分组只是逻辑上进行分组,打上了记号(标签),跟并行度没有绝对的关系 // 同一个分组的数据在一起(不离不弃) // 同一个分区里可以有多个不同的组 // val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy(0) // val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy("id") val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id) // val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy( // new KeySelector[WaterSensor, String] { // override def getKey(value: WaterSensor): String = { // value.id // } // } // ) sensorKS.print().setParallelism(5) // 4. 执行 env.execute() } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.4 shuffle
- 打乱重组(洗牌):将数据按照均匀分布打散到下游
- 参数:无
- 返回:DataStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:FlatMap */ object Transform_Shuffle { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.读取数据 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") val shuffleDS = sensorDS.shuffle sensorDS.print("data") shuffleDS.print("shuffle") // 4. 执行 env.execute() } }
3.5. split
在某些情况下,我们需要将数据流根据某些特征拆分成两个或者多个数据流,给不同数据流增加标记以便于从流中取出。
需求:将水位传感器数据按照空高高低(以40cm,30cm为界),拆分成三个流
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:FlatMap */ object Transform_Split { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.读取数据 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.转换成样例类 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) val splitSS: SplitStream[WaterSensor] = mapDS.split( sensor => { if (sensor.vc < 40) { Seq("normal") } else if (sensor.vc < 80) { Seq("Warn") } else { Seq("alarm") } } ) // 4. 执行 env.execute() } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.6 select
将数据流进行切分后,如何从流中将不同的标记取出呢,这时就需要使用select算子了。
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:FlatMap */ object Transform_Split { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.读取数据 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.转换成样例类 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) val splitDS: SplitStream[WaterSensor] = mapDS.split( sensor => { if (sensor.vc < 40) { Seq("info") } else if (sensor.vc < 80) { Seq("warn") } else { Seq("error") } } ) val errorDS: DataStream[WaterSensor] = splitDS.select("error") val warnDS: DataStream[WaterSensor] = splitDS.select("warn") val infoDS: DataStream[WaterSensor] = splitDS.select("info") infoDS.print("info") warnDS.print("warn") errorDS.print("error") // 4. 执行 env.execute() } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }