聊聊Java类型擦除、Flink中使用Lambda表达式丢失信息和Flink类型暗示机制

最近在学Flink时发现,由于Java类型擦除的存在,导致Flink中使用Lambda表达式时,无法检测出泛型的类型,需要使用Flink类型暗示(type hint)机制才能解决。现在我们就深入剖析一下吧!

什么是Java泛型擦除

本文不介绍Java的泛型,对泛型不太了解的同学强烈推荐这篇博客:https://www.cnblogs.com/coprince/p/8603492.html

看两个例子:

(1)例1

List arrayList = new ArrayList();
arrayList.add("abc");
arrayList.add(12);

for(int i = 0; i< arrayList.size();i++){
	String item = (String)arrayList.get(i);
	System.out.println(item);
}

运行报错:

Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String

因为我们没有指定泛型的类型,所以在List中可以存放任意类型的数据。上述代码先在List中添加了一个String类型的数据,后添加了一个Integer类型的数据,编译器不会提示任何错误,但运行时却报错了。

这是因为List以第一次添加的数据类型为准,即以String的方式使用,后面再添加Integer类型的数据,程序就崩溃了。为了在编译阶段解决类似的问题,我们可以在代码中执行泛型的类型:

List<String> arrayList = new ArrayList<String>();
//arrayList.add(100); 在编译阶段,编译器提示错误

(2)例2

List<String> stringArrayList = new ArrayList<String>();
List<Integer> integerArrayList = new ArrayList<Integer>();

Class classStringArrayList = stringArrayList.getClass();
Class classIntegerArrayList = integerArrayList.getClass();

System.out.println(classStringArrayList==classIntegerArrayList);

输出结果:true

通过上面的例子可以证明,在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦出,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的Java类型擦除

泛型擦除有两种方式,Java使用的是第一种方式,C++和C#使用的是第二种方式

  • 方式一:Code sharing。对同一个原始类型下的泛型类型只生成同一份目标代码
  • 方式二:Code specialization。对每一个泛型类型都生成不同的目标代码。

它们也分别俗称“假”泛型和“真”泛型。导致程序在运行时对泛型类型没有感知,所以上述例子一的代码反编译后只剩下了List,实际上都是Class<? extends ArrayList>的比较,导致例2输出的true

为什么Java要采用Code sharing机制进行类型擦除呢?有两点原因:一是Java泛型是到1.5版本才出现的特性,在此之前JVM已经在无泛型的条件下经历了较长时间的发展,如果采用Code specialization,就得对JVM的类型系统做伤筋动骨的改动,并且无法保证向前兼容性。二是Code specialization对每个泛型类型都生成不同的目标代码,如果有10个不同泛型的List,就要生成10份字节码,造成代码膨胀。

类型擦除对Flink的影响

来看一段简单的代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(word -> new Tuple2<>(word, 1));
mapDataStream.print();
env.execute();

程序运行报错,错误原因如下:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.

意思是说Tuple2中的参数类型缺失,这很可能是因为lambda表达式不能提供足够的信息,使得无法自动检测出Tuple2中的参数类型,建议我们使用匿名内部类代替。

我们换成匿名匿名内部类实现:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> map(String value) throws Exception {
        return new Tuple2<>(value,1);
    }
});

mapDataStream.print();
env.execute();

上述代码成功执行并打印输出。

为什么使用lambda表达式,JVM就无法自动检测出Tuple2中的参数类型,而匿名内部类却可以?

Tuple2中是有两个泛型的,使用匿名内部类时,会被真正编译为class文件,在对象进入JVM和离开JVM的边界处进行类型的检查和转换,从而保证Tuple2的参数类型能够正确的被检测出来。这种方式其实是静态语言的特性。

而Lambda表达式是在运行时调用invokedynamic指令,用以支持动态语言的方法调用。具体来说,它将调用点(CallSite)抽象成一个 Java 类,并且将原本由 Java 虚拟机控制的方法调用以及方法链接暴露给了应用程序。在运行过程中,每一条 invokedynamic 指令将捆绑一个调用点,并且会调用该调用点所链接的方法句柄。在第一次执行 invokedynamic 指令时,Java 虚拟机会调用该指令所对应的启动方法(BootStrap Method),来生成前面提到的调用点,并且将之绑定至该 invokedynamic 指令中。在之后的运行过程中,Java 虚拟机则会直接调用绑定的调用点所链接的方法句柄。亦即在第一次执行其逻辑时才会确定。但是,对象进入JVM后,就会进行类型擦除,导致没有足够的信息检测出Tuple2中两个泛型的具体类型。

上面的说法可能让人有点模糊,需要懂得JVM invokedynamic的原理(哈哈,其实我也没有深入挖,有机会再补)。

为了克服类型擦除带来的问题,Flink类型系统中提供了类型暗示(type hint)机制。在map之后调用returns方法,就可以指定返回类型了。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(word -> new Tuple2<>(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
mapDataStream.print();
env.execute();

另外,对于确定的数据类型(即没有泛型的数据类型),可以随意在flink中使用lambda表达式。例如:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<String> mapDataStream = dataStream.map(word -> word+"_1");

mapDataStream.print();
env.execute();

上述代码就正常执行。

参考资料

  1. https://www.cnblogs.com/coprince/p/8603492.html
  2. https://time.geekbang.org/column/article/12564
  3. https://time.geekbang.org/column/article/12574
  4. https://zhuanlan.zhihu.com/p/26389041
  5. https://blog.csdn.net/nazeniwaresakini/article/details/104220123
上一篇:xshell 所选的用户密钥未在远程主机上注册;无法加载密钥


下一篇:有道词典_每日一句_2022/02