5.5.3 富函数(Rich Functions)
“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。
它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- …
Rich Function有一个生命周期的概念。典型的生命周期方法有:
open()
方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()
会被调用。close()
方法是生命周期中的最后一个调用的方法,做一些清理工作。getRuntimeContext()
方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
public static class MyMapFunction extends RichMapFunction<SensorReading, Tuple2<Integer, String>> {
@Override public Tuple2<Integer, String> map(SensorReading value) throws Exception {
return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId());
}
@Override public void open(Configuration parameters) throws Exception {
System.out.println("my map open"); // 以下可以做一些初始化工作,例如建立一个和HDFS的连接
}
@Override public void close() throws Exception {
System.out.println("my map close"); // 以下做一些清理工作,例如断开和HDFS的连接
}
}
测试代码:
package com.zch.apitest.transform;
import com.zch.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author: zhaoHui
* Date: 2022/01/22
* Time: 13:29
* Description:
*/
public class TransformTest5_RichFunction {
public static void main(String[] args) throws Exception{
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取文件
DataStream<String> inputStream = env.readTextFile("F:\\JAVA\\bigdata2107\\zch\\flink\\src\\main\\resources\\Sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(lines -> {
String[] split = lines.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
DataStream<Tuple2<String,Integer>> resultStream = dataStream.map(new MyMapper());
resultStream.print();
env.execute();
}
public static class MyMapper0 implements MapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(),value.getId().length());
}
}
// 实现自定义富函数类方法
public static class MyMapper extends RichMapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(),getRuntimeContext().getAttemptNumber());
}
@Override
public void open(Configuration parameters) throws Exception {
// 初始化工作,一般是给定义状态,或者建立数据库连接
System.out.println("open");
}
@Override
public void close() throws Exception {
// 一般是关闭连接和清空状态的收尾操作
System.out.println("close");
}
}
}