执行环境和source

Flink创建执行环境有四种方式,下图从上到下是

1 获取执行环境

2.创建本地带webui的环境(实验用)

3.创建本地环境

4.创建远程环境

执行环境和source

 

Flink在流处理上的source和在批处理上的source基本一致。大致有五大类:

  • 基于本地集合的source(Collection-based-source)
  • 基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回
  • 基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。

前三种是内置的,下面俩种是要add进去的。

  • 自定义的source(Custom-source)(用户自定义,实现SourceFunction接口,将该类实例加到SreamExecutionEnvironmentd的addSource方法中就可以了)
  • 第三方connector source (Kafka就是典型的第三方Source)

用到最多的就是KafKa Source,其他都是用来实验的玩具

source分为多并行,单并行。

1. socket方式的socketTextStream是单并行的 

2. 本地集合有多种方式 fromCollecttion(单并行), fromElements(单并行),fromParallelCollection(多并行), generateSequence(多并行)本地集合一般是有限数据流

单并行的source直接实现了sourceFunction接口,多并行的source直接实现了ParalleSourceFunction或者RichParalleSourceFunction接口。

3.文件方式有readFile(多并行,无限数据流,但是多个task会重复读)和readTextFile(有限数据流,读完就退出)

执行环境和source

执行环境和source

4.自定义source 

 自定义source有三种方式 分别是

1.新建一个类实现sourceFunction接口 (单并行)

重写 run方法,cancel方法

2. 新建一个类实现ParallelsourceFunction接口(多并行)

重写 run方法,cancel方法

3.新建一个类实现RichParallelsourceFunction接口 (多并行 ,功能更丰富,使用这种方法最多)

重写 open方法 run方法,cancel 方法 close方法 ,这些方法的执行顺序也是open,run,cancel,close

5. 第三方source

看一下典型的kafkasource调用代码块

public class kafukaSourceDemo {
    public static void main(String[] args) throws Exception {
        //创建环境
        StreamExecutionEnvironment localEnvironmentWithWebUI = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());


        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers","Hadoop02:9092,Hadoop03:9092,Hadoop04:9092");
        properties.setProperty("auto.offset.reset","earliest");
        properties.setProperty("group.id","g1");
        properties.setProperty("enable.auto.commit","true");
      /*  properties.setProperty("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");*/
      //  KafkaConsumer<Object, Object> kafkaCosumer = new KafkaConsumer<>(""first",properties);
        FlinkKafkaConsumer<String> coumser = new FlinkKafkaConsumer<>("order", new SimpleStringSchema(), properties);

        DataStreamSource<String> lines = localEnvironmentWithWebUI.addSource(coumser);

        lines.print();

        localEnvironmentWithWebUI.execute();

    }
}
上一篇:python之 文件读与写


下一篇:spring boot 集成 kaptcha