Flink创建执行环境有四种方式,下图从上到下是
1 获取执行环境
2.创建本地带webui的环境(实验用)
3.创建本地环境
4.创建远程环境
Flink在流处理上的source和在批处理上的source基本一致。大致有五大类:
- 基于本地集合的source(Collection-based-source)
- 基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回
- 基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。
前三种是内置的,下面俩种是要add进去的。
- 自定义的source(Custom-source)(用户自定义,实现SourceFunction接口,将该类实例加到SreamExecutionEnvironmentd的addSource方法中就可以了)
- 第三方connector source (Kafka就是典型的第三方Source)
用到最多的就是KafKa Source,其他都是用来实验的玩具
source分为多并行,单并行。
1. socket方式的socketTextStream是单并行的
2. 本地集合有多种方式 fromCollecttion(单并行), fromElements(单并行),fromParallelCollection(多并行), generateSequence(多并行)本地集合一般是有限数据流
单并行的source直接实现了sourceFunction接口,多并行的source直接实现了ParalleSourceFunction或者RichParalleSourceFunction接口。
3.文件方式有readFile(多并行,无限数据流,但是多个task会重复读)和readTextFile(有限数据流,读完就退出)
4.自定义source
自定义source有三种方式 分别是
1.新建一个类实现sourceFunction接口 (单并行)
重写 run方法,cancel方法
2. 新建一个类实现ParallelsourceFunction接口(多并行)
重写 run方法,cancel方法
3.新建一个类实现RichParallelsourceFunction接口 (多并行 ,功能更丰富,使用这种方法最多)
重写 open方法 run方法,cancel 方法 close方法 ,这些方法的执行顺序也是open,run,cancel,close
5. 第三方source
看一下典型的kafkasource调用代码块
public class kafukaSourceDemo {
public static void main(String[] args) throws Exception {
//创建环境
StreamExecutionEnvironment localEnvironmentWithWebUI = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","Hadoop02:9092,Hadoop03:9092,Hadoop04:9092");
properties.setProperty("auto.offset.reset","earliest");
properties.setProperty("group.id","g1");
properties.setProperty("enable.auto.commit","true");
/* properties.setProperty("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");*/
// KafkaConsumer<Object, Object> kafkaCosumer = new KafkaConsumer<>(""first",properties);
FlinkKafkaConsumer<String> coumser = new FlinkKafkaConsumer<>("order", new SimpleStringSchema(), properties);
DataStreamSource<String> lines = localEnvironmentWithWebUI.addSource(coumser);
lines.print();
localEnvironmentWithWebUI.execute();
}
}