DataStream API(一)
在了解DataStream API之前我们先来了解一下Flink API的构成。Flink API是分层的。由最底层的Stateful Stream Process到最顶层的SQL分为四层。如下图:
DataStream API 顾名思义,就是DataStream类的API,DataStream表示Flink程序中的流式数据集合。它是一个包含重复项的不可变数据集合,这些数据可以是有界的也可以是*的,处理他们的API是相同的。
DataStream是不可变的,这意味着一旦它们被创建,就不能添加或删除元素。也不能简单地检查内部的元素,而只能使用DataStream API(Transform)来处理它们。
Flink程序基本部分组成:
- 获得执行环境(Environment),
- 加载/创建初始数据(Source),
- 指定此数据的转换(Transform),
- 指定计算结果的存放位置(Sink),
- 触发程序执行(Execut)
下面我们一起来了解一下Flink DataStream的执行环境。
Environment
Flink的执行环境包括两种,分别是StreamExecutionEnvironment和ExecutionEnvironment,他们分别对应StreamData和DataSet。StreamData是流式数据集,DataSet是批量数据集。
StreamExecutionEnvironment是所有Flink流式处理程序的基础。它为我们提供了三种实例化的方法,分别是:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
createLocalEnvironment()
这个方法是获取本地的执行环境。它有两个重载,分别是:
//parallelism表示并行度
createLocalEnvironment(int parallelism)
createLocalEnvironment(int parallelism, Configuration configuration)
createRemoteEnvironment(String host, int port, String... jarFiles)
这个方法是获取集群的执行环境。与createLocalEnvironment()类似,它也有两个重载,分别是:
createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
createRemoteEnvironment(String host, int port, Configuration clientConfig, String... jarFiles)
getExecutionEnvironment()
getExecutionEnvironment()可以自行判断我们当前程序的执行环境并为我们返回与之相对应的实例。换句话说,通常情况下我们不需要自己判断到底是使用createLocalEnvironment还是使用createRemoteEnvironment,一律用getExecutionEnvironment就OK了。
Environment实例
通过Environment实例我们可以做很多事情,比如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//获取数据源
//当然获取数据源的方式有很多种,下面会一一介绍
env.readTextFile("FilePath");
//执行程序
env.Execute();
//...
Data Sources
Data Source的核心组件包括三个,分别是Split、SourceReader、SplitEnumerator。
- Split:Split是数据源的一部分,例如:文件的一部分或者一条日志。Splits是Source并行读取数据和分发作业的基础。
- SourceReader:SourceReader向SplitEnumerator请求Splits并处理请求到的Splits。SourceReader位于TaskManager中,这意味着它是并行运行的,同时,它可以产生并行的事件流/记录流。
- SplitEnumerator:SplitEnumerator负责管理Splits并且将他们发送给SourceReader。它是运行在JobManager上的单个实例。负责维护正在进行的分片的备份日志并将这些分片均衡的分发到SourceReader中。
具体的过程可以参考以下图片:
下面我们来介绍一下几个常用的获取数据源的方式。
从集合中读取
ArrayList<String> strList = new ArrayList<String>();
strList.add("are");
strList.add("you");
strList.add("ok");
env.fromCollection(strList);
从文件中读取
DataStreamSource<String> inputData = env.readTextFile("FilePath");
消费kafka中的数据
引入kafka依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.0</version>
</dependency>
代码示例
//kafka配置
Properties pro = new Properties();
pro.setProperty("bootstrap.servers", "localhost:9092");
pro.setProperty("group.id", "consumer-group");
pro.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pro.setProperty("auto.offset.reset", "latest");
//消费kafka数据
DataStreamSource inputStream = env.addSource(new FlinkKafkaConsumer("topic", new SimpleStringSchema(), pro));
当然,出了kafka之外Flink还为我们提供了ElasticSearch、HDFS、RabbitMQ、JDBC等作为数据源的接口。此处就不一一介绍了。
自定义数据源
其实深入研究之后我们会发现,FlinkKafkaConsumer其实是实现了一个SourceFunction接口。so,我们可以通过实现SourceFunction的方式来自定义我们自己的数据源。
有了这个功能我们可以很轻松的模拟真实的业务场景。毕竟,绝大多数的项目在开发阶段并不会有真实的业务场景来提供数据源。
DataStreamSource inputStream = env.addSource(new MyDataSource());
private static class MyDataSource implements SourceFunction<Sensor> {
private boolean running = true;
public void run(SourceContext sourceContext) throws Exception {
while(running){
//读取数据,可以从cvs文件...自定义数据源读取数据
Thread.sleep(100);
}
}
public void cancel() {
running = false;
}
}