0 简介
Flink程序所处理的流中的事件一般是对象类型。操作符接收对象输出对象。所以Flink的内部机制需要能够处理事件的类型。在网络中传输数据,或者将数据写入到状态后端、检查点和保存点中,都需要我们对数据进行序列化和反序列化。为了高效的进行此类操作,Flink需要流中事件类型的详细信息。Flink使用了Type Information
的概念来表达数据类型,这样就能针对不同的数据类型产生特定的序列化器,反序列化器和比较操作符。
Flink也能够通过分析输入数据和输出数据来自动获取数据的类型信息以及序列化器和反序列化器。尽管如此,在一些特定的情况下,例如匿名函数或者使用泛型的情况下,我们需要明确的提供数据的类型信息,来提高我们程序的性能。
在这一节中,我们将讨论Flink支持的类型,以及如何为数据类型创建相应的类型信息,还有就是在Flink无法推断函数返回类型的情况下,如何帮助Flink的类型系统去做类型推断。
1 支持的数据类型
Flink支持Java和Scala提供的所有普通数据类型。最常用的数据类型可以做以下分类:
- Primitives(原始数据类型)
- Java和Scala的Tuples(元组)
- Scala的样例类
- POJO类型
- 一些特殊的类型
接下来让我们一探究竟。
Primitives
Java和Scala提供的所有原始数据类型都支持,例如Int
(Java的Integer
),String,Double等等。下面举一个例子:
DataStream[Long] numbers = env.fromElements(1L, 2L, 3L, 4L); numbers.map(n -> n + 1);
Tuples
元组是一种组合数据类型,由固定数量的元素组成。
Flink为Java的Tuple提供了高效的实现。Flink实现的Java Tuple最多可以有25个元素,根据元素数量的不同,Tuple都被实现成了不同的类:Tuple1,Tuple2,一直到Tuple25。Tuple类是强类型。
DataStream<Tuple2<String, Integer>> persons = env .fromElements( Tuple2.of("Adam", 17), Tuple2.of("Sarah", 23) ); persons.filter(p -> p.f1 > 18);
Tuple的元素可以通过它们的public属性访问——f0,f1,f2等等。或者使用getField(int pos)
方法来访问,元素下标从0开始:
import org.apache.flink.api.java.tuple.Tuple2 Tuple2<String, Integer> personTuple = Tuple2.of("Alex", 42); Integer age = personTuple.getField(1); // age = 42
不同于Scala的Tuple,Java的Tuple是可变数据结构,所以Tuple中的元素可以重新进行赋值。重复利用Java的Tuple可以减轻垃圾收集的压力。举个例子:
personTuple.f1 = 42; // set the 2nd field to 42 personTuple.setField(43, 1); // set the 2nd field to 43
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements( new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer>("world", 2)); wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() { @Override public Integer map(Tuple2<String, Integer> value) throws Exception { return value.f1; } }); wordCounts.keyBy(value -> value.f0);
POJO
POJO类的定义:
- 公有类
- 无参数的公有构造器
- 所有的字段都是公有的,可以通过getters和setters访问。
- 所有字段的数据类型都必须是Flink支持的数据类型。
举个例子:
public class Person { public String name; public int age; public Person() {} public Person(String name, int age) { this.name = name; this.age = age; } } DataStream<Person> persons = env.fromElements( new Person("Alex", 42), new Person("Wendy", 23) );
public class WordWithCount { public String word; public int count; public WordWithCount() {} public WordWithCount(String word, int count) { this.word = word; this.count = count; } } DataStream<WordWithCount> wordCounts = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2)); wordCounts.keyBy(value -> value.word);
其他数据类型
- Array, ArrayList, HashMap, Enum
- Hadoop Writable types
2 为数据类型创建类型信息
Flink类型系统的核心类是TypeInformation
。它为系统在产生序列化器和比较操作符时,提供了必要的类型信息。例如,如果我们想使用某个key来做联结查询或者分组操作,TypeInformation
可以让Flink做更严格的类型检查。
Flink针对Java和Scala分别提供了类来产生类型信息。在Java中,类是
org.apache.flink.api.common.typeinfo.Types
举个例子:
TypeInformation<Integer> intType = Types.INT; TypeInformation<Tuple2<Long, String>> tupleType = Types .TUPLE(Types.LONG, Types.STRING); TypeInformation<Person> personType = Types .POJO(Person.class);