Flink 数据源 DataSource是这个样子的?(一)

为何要使用 Flink

因为本篇文章中,有个 Kafka 数据源的 Demo,在一开始解答小伙伴有可能的困惑:

Question:既然监听 Kafka 消息,为何不建立一个简单的消息消费者,使用简单的代码就能进行消息的消费?


Answer:在普通的消费者逻辑中,只能做到对传送过来的一条消息进行单条处理。而在 Flink 这个优秀的流计算框架中,能够使用窗口进行多样化处理。提供了窗口处理函数,可以对一段时间(例如 5s 内)或者一批(计数大小,例如 5 个一批)的数据进行计数或者 reduce 整合处理

还有 Flink 拥有状态管理,能够保存 checkpoint,如果程序出现错误,也能够之前的检查点恢复,继续程序的处理,于是拥有这些好处的优秀框架,希望小伙伴也加入进来,一起学习~

1、前言

接下来的几篇文章,都会围绕着下面这张图,整体上来说,就是 DataStreamAPI 编程的练习:Flink 数据源 DataSource是这个样子的?(一)

分别是 SourceTransformationSink 进行逐一学习。


2、DataSource 介绍

直译:数据来源

计算引擎,不管是批出来还是流处理,最重要的是数据来源,根据源源不断的数据进行处理,加工成更有价值的数据。

Flink 官方包中提供了如下基于集合、文件、套接字等 API然后第三方例如 KafkaRabbitMq 等也提供了方便的集成库

由于我们测试时,使用的是 StreamExecutionEnvironment.getExecutionEnvironment() 来获取流执行环境类进行操作,所以我们来看下这个类的返回类型是 DataStreamSource 的方法:

3、集合

集合数据源主要有三种:collectionelementgenerateSequence

  • fromCollection(Collection):接受的参数对象必须是同一类型的集合
  • fromCollection(Iterator<OUT>data, Class<OUT> type):第一个参数是迭代器,第二个参数是指定返回的类型
  • fromElements(Class<OUT> type, OUT... data):第一个参数是指定返回的类型,后面的是不定数量入参,可以输入多个 OUT 类型的对象
  • fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo, String operatorName)从一个可分离的迭代器中创建并行数据源。这个方法是 parallel 并行数据源的底层调用方法,typeInfo 是具体的类型信息,最后一个参数就是操作名字。这个并行数据源并没有测试过,等到之后回来补坑吧。
  • generateSequence(long, long):创建一个包含数字序列的新数据流。例如传进去是 1l 和 10l,那么数据源就是 [1-10]

测试代码如下:

DataSourceFromCollection.java


private static DataStreamSource<Student> collection1(StreamExecutionEnvironment env) {
    List<Student> studentList = Lists.newArrayList(
            new Student(1, "name1", 23, "address1"),
            new Student(2, "name2", 23, "address2"),
            new Student(3, "name3", 23, "address3")
    );
    return env.fromCollection(studentList);
}


private static DataStreamSource<Long> collection2(StreamExecutionEnvironment env) {
    return env.generateSequence(1, 20);
}

4、文件 File

从官方例子中,罗列了以下三个读取文件的方法,第一个返回的文本类型的数据源,第二个数据源是只读取一次文件,第三个方法参数比较多,文档中关于 watchType 观察类型介绍比较多,这里翻译自文档 Flink DataStream API Programming Guide


  • readTextFile(filePath):从 filePath读取文本数据源,文本类型是 TextInputFormat 以及字符串类型是 UTF-8,返回的是文本类型的数据源
  • readFile(fileInputFormat, path):根据指定的文件输入格式读取文件,只读取一次,不随着文本修改重新读取
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):这是前两个内部调用的方法。它根据给定的 fileInputFormat读取路径中的文件。**根据提供的 watchType 对数据源做不同的操作,FileProcessingMode.PROCESS_CONTINUOUSLY 模式下,会定期(每间隔 ms)监视新数据的路径,FileProcessingMode.PROCESS_ONCE 模式下,会一次处理当前路径中的数据并退出。**使用 pathFilter,用户可以进一步从处理文件中排除文件。


4.1、实现 IMPLEMENTATION:


在后台,Flink将文件读取过程分为两个子任务,即目录监视和数据读取。这些子任务中的每一个都是由单独的实体实现的。

监视由单个非并行(并行度= 1)任务实现,而读取由并行运行的多个任务执行。

后者的并行性等于作业并行性。单个监视任务的作用是扫描目录(根据 watchType 定期或仅扫描一次),查找要处理的文件,将它们划分为多个拆分,然后将这些拆分分配给下游读取器 (reader)。

readers 是实际读取到数据的角色。每一个分片 split 只能由一个 reader读取,但是一个 reader 可以读取多个分片 split


4.2、重要笔记 IMPORTANT NOTES:

  1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。 这可能会破坏“完全一次”的语义,因为在文件末尾附加数据将导致重新处理其所有内容。
  2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,则源将扫描路径一次并退出,而无需等待读取器完成文件内容的读取。 当然,读者将继续阅读,直到读取了所有文件内容。关闭源将导致在该点之后没有更多检查点。这可能导致节点故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

根据上诉两种情况,个人觉得如果用文件数据作为数据源进行测试,那么使用第二种观察模式 FileProcessingMode.PROCESS_ONCE,只扫描一次,避免修改文件后影响之前的计算结果。

DataSourceFromFile.java


// 简单的文字文件输入流
DataStreamSource<String> textFileSource = env.readTextFile(filePath);

// 指定格式和监听类型
Path pa = new Path(filePath);
TextInputFormat inputFormat = new TextInputFormat(pa);
DataStreamSource<String> complexFileSource =
                env.readFile(inputFormat, filePath, 
                FileProcessingMode.PROCESS_CONTINUOUSLY, 
                100L, 
                TypeExtractor.getInputFormatTypes(inputFormat));

5、套接字 Socket

  • socketTextStream:从套接字 socket 读取。元素可以由自定义分隔符 delimiter 进行分隔。

DataSourceFromSocket.java


// 监听端口号
DataStreamSource<String> source = env.socketTextStream("localhost", 9000);

// 定义分隔符
DataStreamSource<String> source = env.socketTextStream("localhost", 9000, "\\W+");

更具体例子可以参考上一篇 Hello World 例子

6、自定义 DataSource

从前面介绍中看到,Flink 提供了一个 addSource(SourceFunction<OUT>) 的方法,其中 SourceFunction 是实现自定义数据源的关键接口,而我们常用来扩展的是它的抽象子类 RichSourceFunction

上一篇:重新定义性能测试: Apache Flink 重磅开源流计算基准测试框架


下一篇:电脑接双屏之分屏输出设置