DataStream API介绍和示例
Flink程序运行流程
1. 获取执行环境
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
2. 加载创建初始化数据
readTextFile()
addSource
..
3. 对数据在transformation operator
map
flatMap
filter
..
4. 指定计算结果的输出位置 sink
print()
writeAdText(String path)
addSink
..
5. 触发程序执行 execute
env.execute()
在sink是print时,不需要显示execute,否则会报错。因为在print方法里已经默认调用了execute。
StreamExecutionEnvironment
StreamExecutionEnvironment 作为程序入口context,有两类:LocalStreamEnvironment(本地环境) 和RemoteStreamEnvironment(远程环境)。
ExecutionConfig、CheckpointConfig等配置均在这里初始化。另外,这里也能设置线程数,检查点周期,以及检查点模式。还有状态后端序列化类型以及注册Type等。
- 如果集群是standalone模式,则StreamExecutionEnvironment.getExecutionEnvironment() 相当于StreamExecutionEnvironment.createLocalEnvironment()
DataStream Source
基于文件的
- readTextFile(String path) charsetName 默认用 UTF-8
- readTextFile(String path, String charsetName):文本文件,格式为 TextInputFormat,返回 BasicTypeInfo.STRING_TYPE_INFO ,TextInputFormat对象调用 setCharsetName(charsetName) 设置字符 ,然后底层再调用 readFile 方法。
- readFile(FileInputFormat
inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation typeInformation):根据给定格式和路径读取文件,根据watchType(FileProcessingMode.PROCESS_ONCE:处理一次路径文件后退出,FileProcessingMode.PROCESS_CONTINUOUSLY:检测给定路径的新数据,此时若旧文件发生修改也会重读,不符合exactly-once),interval 扫描路径的周期。调用 createFileInput ,createFileInput 调用 addSource 。
Socket流
- socketTextStream(hostname , port) // 主机,端口号,字段分隔符 delimiter 默认为 \n
- socketTextStream(hostname , port , delimiter) // maxRetry 默认为零
- socketTextStream(hostname, port, delimiter, maxRetry )
maxRetry: 当socket端挂掉是,程序等待的最大重试时间。每秒都会重试连接,为0即停止程序...。利用 SocketTextStreamFunction 生成 sourceFunction对象,调用 addSource 生成DataStreamSource
基于数据集的
都是通过本身的SourceFunction对象调用addSource
- fromCollection(Iterator, class)
- fromCollection(Iterator, TypeInformation)
- fromElements(T...)
- fromParallelCollection
Customer Source 自定义source
- addSource(sourceFunction ) // sourceName 为默认值
- addSource(sourceFunction , sourceName) // typeInfo 为 null
- addSource(sourceFunction , typeInformation) // SourceName 有默认值
- addSource(sourceFunction , sourceName , typeInformation)
自定义source代码示例
package source;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* RichSourceFunction 实现了SourceFunction接口,只做了序列化
* 实现接口SourceFunction或者继承 RichSourceFunction 需要申明返回的数据类型,不然会报错:
* Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
* The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred.
* Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
*/
public class MyDataSource extends RichSourceFunction<Integer> {
private static final Logger LOG = LoggerFactory.getLogger(MyDataSource.class);
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Integer> ctx) throws Exception{
while (isRunning){
Thread.sleep(300);
int rnd = (int) (Math.random() * 10);
LOG.info("emit data:"+rnd);
ctx.collect( rnd );
}
}
@Override
public void cancel() {
isRunning = false;
}
}
DataStream Transformations
Map [DataStream -> DataStream]
对数据集内每条数据都进行相同的规则处理,常用来做清洗和转换数据格式等
DataStream<Tuple2<String, Integer>> windowCount
.map(new MapFunction<Tuple2<String,Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
FlatMap [DataStream -> DataStream]
将数据集进行打平,即按照逻辑合并在一个或多个数据集里面
DataStream<Tuple2<String, Integer>> windowCount = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : value.split("\\s")) {
collector.collect(Tuple2.of(word, 1));
}
}
})
Filter [DataStream -> DataStream]
过滤数据,符合要求的数据返回 true,不符合要求的返回 false
text
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.contains("h");
}
})
KeyBy [DataStream -> KeyedStream]
在数据集中进行 Partition操作,将相同的key值的数据放到相同的分区中,返回 keyedStream。
指定key时可以按位置指定,也可以按名称指定(此时需要pojo类、case class等明确了字段位置的)
注意以下类型不能成为key:
- POJO类型但是不覆盖 hashCode() 方法并依赖于Object.hashCode() 实现
- 任何类型的数组
Reduce [KeyedStream -> DataStream]
定义聚合逻辑,对数据进行聚合处理,其聚合逻辑要求满足运算结合律和交换律。当前元素的值和最后一个Reduce的值进行组合并返回出新的Reduce的值。
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
return Tuple2.of(stringIntegerTuple2.f0, stringIntegerTuple2.f1+t1.f1);
}
})
Aggregate [keyedStream -> DataStream]
聚合算子,将Reduce算子中的函数进行了封装。封装的操作包括 sum、min、minBy、max、maxBy等
Fold [keyedStream -> DataStream]
将数据进行滚动折叠,可指定开始值。未来将取消,全部用Aggregate替代
Union
合并操作,要求两个 DataStream 数据格式一样
Connect
不要求格式一样,类似拼接格式操作,返回 ConnectedStreams。
比如 (String,Int) connect (Int) 结果: ((String, Int), Int)
ConnectedStreams不能直接print,需要使用CoMapFunction 或CoFlatMapFunction分别处理DataStrea,处理后返回的数据类型必须保持一致。
Split [DataStream -> SplitStream]
Union算子的逆向实现
Select [SplitStream -> DataStream]
Select是splitStream的方法,split 只是进行标记,并未进行切分。select切分数据集。
SplitStream<String> split = text.split(new OutputSelector<String>() {
// 切分数据的时候给每部分数据打上标记
@Override
public Iterable<String> select(String value) {
ArrayList<String> strings = new ArrayList<>();
if (value.contains("h"))
strings.add("hadoop");
else
strings.add("noHadoop");
return strings;
}
});
// 打印有 hadoop 标签的数据
split.select("hadoop").print();
// 打印有 noHadoop 标签的数据
split.select("noHadoop")
.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s.toUpperCase();
}
})
.print();
Partition类 transformation
- shuffle: 随机分配,分区相对均衡,容易失去原有数据分区结构
- rebalance: 尽可能保证每个分区的数据平衡,多用于数据倾斜
- rescale: 待定