本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程:
Flink大数据项目实战:http://t.cn/EJtKhaz
1. 继续侃Flink编程基本套路
1.1 DataSet and DataStream
DataSet and DataStream表示Flink app中的分布式数据集。它们包含重复的、不可变数据集。DataSet有界数据集,用在Flink批处理。DataStream可以是*,用在Flink流处理。它们可以从数据源创建,也可以通过各种转换操作创建。
1.2共同的编程套路
DataSet and DataStream 这里以WordCount为例,共同的编程套路如下所示:
1.获取执行环境(execution environment)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.加载/创建初始数据集
// 读取输入数据
DataStream<String> text;
if (params.has("input")) {
// 读取text文件
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// 读取默认测试数据集
text = env.fromElements(WordCountData.WORDS);
}
3.对数据集进行各种转换操作(生成新的数据集)
DataStream<Tuple2<String, Integer>> counts =
// 切分每行单词
text.flatMap(new Tokenizer())
//对每个单词分组统计词频数
.keyBy(0).sum(1);
4.指定将计算的结果放到何处去
// 输出统计结果
if (params.has("output")) {
//写入文件地址
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
//数据打印控制台
counts.print();
}
5.触发APP执行
// 执行flink 程序
env.execute("Streaming WordCount");
1.3惰性计算
Flink APP都是延迟执行的,只有当execute()被显示调用时才会真正执行,本地执行还是在集群上执行取决于执行环境的类型。好处:用户可以根据业务构建复杂的应用,Flink可以整体进优化并生成执行计划。
2. 指定键(Specifying Keys)
2.1谁需要指定键
哪些操作需要指定key呢?常见的操作如join, coGroup, keyBy, groupBy,Reduce, GroupReduce, Aggregate, Windows等。
Flink编程模型的key是虚拟的,不需要你创建键值对,可以在具体算子通过参数指定,如下代码所示:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
2.2为Tuple定义键
Tuple定义键的方式有很多种,接下来我们一起看几个示例:
按照指定属性分组
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:此时表示使用Tuple3三元组的第一个成员作为keyBy
按照组合键进行分组
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
注意:此时表示使用Tuple3三元组的前两个元素一起作为keyBy
特殊情况:嵌套Tuple
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:这里使用KeyBy(0)指定键,系统将会使用整个Tuple2作为键(整型和浮点型的)。如果想使用Tuple2内部字段作为键,你可以使用字段来表示键,这种方法会在后面阐述。
2.3使用字段表达式定义键
基于字符串的字段表达式可以用来引用嵌套字段(例如Tuple,POJO)
public class WC {
public String word;
public User user;
public int count;
}
public class User{
public int age;
public String zip;
}
示例:通过word字段进行分组
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
语法:
1.直接使用字段名选择POJO字段
例如 user 表示 一个POJO的user字段
2.Tuple通过offset来选择
"_1"和"5"分别代表第一和第六个Scala Tuple字段
“f0” and “f5”分别代表第一和第六个Java Tuple字段
3.选择POJO和Tuples的嵌套属性
user.zip
在scala里你可以"_2.user.zip"或"user._4.1.zip”
在java里你可以“2.user.zip”或者" user.f0.1.zip ”
4.使用通配符表达式选择所有属性,java为“*”,scala为 "_"。不是POJO或者Tuple的类型也适用。
2.4字段表达式实例-Java
以下定义两个Java类:
public static class WC {
public ComplexNestedClass complex;
private int count;
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
我们一起看看如下key字段如何理解:
1."count": wc 类的count字段
2."complex":递归的选取ComplexNestedClass的所有字段
3."complex.word.f2": ComplexNestedClass类中的tuple word的第三个字段;
4."complex.hadoopCitizen":选择Hadoop IntWritable类型。
2.5字段表达式实例-Scala
以下定义两个Scala类:
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
我们一起看看如下key字段如何理解:
1."count": wc 类的count字段
2."complex":递归的选取ComplexNestedClass的所有字段
3."complex.word._3": ComplexNestedClass类中的tuple word的第三个字段;
4."complex.hadoopCitizen":选择Hadoop IntWritable类型。
2.6 Key Selector Functions
还有一种定义键的方式叫做“键选择器”函数。键选择器函数需要一个元素作为入参,返回这个元素的键。这个键可以是任何类型的,也可从指定计算中生成。
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) {
return wc.word;
}
});
3. 自定义转换函数
3.1实现接口
大多数的转换操作需要用户自己定义函数,可以通过实现MapFunction接口,并重写map函数来实现。
3.2匿名类
也可以直接使用匿名类,不需要定义类名称,直接new接口重写map方法即可。
3.3 Lambda表达式
使用Lambda表达式比自定义函数更方便,更直接。
3.4 Rich Functions
遇到特殊的需求,比如读取数据库中的数据,如果数据库连接放在map函数里面迭代循环,实现谱图mapFunction接口无法满足要求。
我们需要继承RichMapFunction,将获取数据库连接放在open方法中,具体转换放在map方法中。
当然它也可以使用匿名类:
Rich Function拥有非常有用的四个方法:open,close,getRuntimeContext和setRuntimecontext
这些功能在参数化函数、创建和确定本地状态、获取广播变量、获取运行时信息(例如累加器和计数器)和迭代信息时非常有帮助。
4. 支持的数据类型
Flink对DataSet和DataStream中可使用的元素类型添加了一些约束。原因是系统可以通过分析这些类型来确定有效的执行策略和选择不同的序列化方式。
有7中不同的数据类型:
1.Java Tuple 和 Scala Case类;
2.Java POJO;
3.基本类型;
4.通用类;
5.值;
6.Hadoop Writables;
7.特殊类型
4.1Java Tuple
Tuple是包含固定数量各种类型字段的复合类。Flink Java API提供了Tuple1-Tuple25。Tuple的字段可以是Flink的任意类型,甚至嵌套Tuple。
访问Tuple属性的方式有以下两种:
1.属性名(f0,f1…fn)
2.getField(int pos)
4.2Scala Case类
Scala的Case类(以及Scala的Tuple,实际是Case class的特殊类型)是包含了一定数量多种类型字段的组合类型。Tuple字段通过他们的1-offset名称定位,例如 _1代表第一个字段。Case class 通过字段名称获得:
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
4.3POJOs
Java和Scala的类在满足下列条件时,将会被Flink视作特殊的POJO数据类型专门进行处理:
1.是公共类;
2.无参构造是公共的;
3.所有的属性都是可获得的(声明为公共的,或提供get,set方法);
4.字段的类型必须是Flink支持的。Flink会用Avro来序列化任意的对象。
Flink会分析POJO类型的结构获知POJO的字段。POJO类型要比一般类型好用。此外,Flink访问POJO要比一般类型更高效。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) { this.word = word; this.count = count; }
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word");
4.4基本类型
Flink支持Java和Scala所有的基本数据类型,比如 Integer,String,和Double。
4.5一般通用类
Flink支持大多数的Java,Scala类(API和自定义)。包含不能序列化字段的类在增加一些限制后也可支持。遵循Java Bean规范的类一般都可以使用。
所有不能视为POJO的类Flink都会当做一般类处理。这些数据类型被视作黑箱,其内容是不可见的。通用类使用Kryo进行序列/反序列化。
4.6值类型Values
通过实现org.apache.flinktypes.Value接口的read和write方法提供自定义代码来进行序列化/反序列化,而不是使用通用的序列化框架。
Flink预定义的值类型与原生数据类型是一一对应的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。这些值类型作为原生数据类型的可变变体,他们的值是可以改变的,允许程序重用对象从而缓解GC的压力。
4.7 Hadoop的Writable类
它实现org.apache.hadoop.Writable接口的类型,该类型的序列化逻辑在write()和readFields()方法中实现。
4.8特殊类型
Flink比较特殊的类型有以下两种:
1.Scala的 Either、Option和Try。
2.Java ApI有自己的Either实现。
4.9类型擦除和类型推理
注意:本小节内容仅针对Java
Java编译器在编译之后会丢弃很多泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。
例如,在JVM中,DataStream<String>和DataStream<Long>的实例看起来是相同的。
List<String> l1 = new ArrayList<String>();
List<Integer> l2 = new ArrayList<Integer>();
System.out.println(l1.getClass() == l2.getClass());
泛型:一种较为准确的说法就是为了参数化类型,或者说可以将类型当作参数传递给一个类或者是方法。
Flink 的Java API会试图去重建(可以做类型推理)这些被丢弃的类型信息,并将它们明确地存储在数据集以及操作中。你可以通过DataStream.getType()方法来获取类型,这个方法将返回一个TypeInformation的实例,这个实例是Flink内部表示类型的方式。
5. 累加器和计数器
5.1累加器和计数器
计数器是最简单的累加器。
内置累加器主要包含以下几类:
1.IntCounter, LongCounter 和 DoubleCounter
2.Histogram(柱状图)
5.2如何使用累加器
第一步:在自定义的转换操作里创建累加器对象:
private IntCounter numLines = new IntCounter();
第二步:注册累加器对象,通常是在rich function的open()方法中。这里你还需要定义累加器的名字getRuntimeContext().addAccumulator(“num-lines”, this.numLines);
第三步:在operator函数的任何地方使用累加器,包括在open()和close()方法中
this.numLines.add(1);
第四步:结果存储在JobExecutionResult里:
JobExecutionResult JobExecutionResult =env.execute("Flink Batch Java API Skeleton")
myJobExecutionResult.getAccumulatorResult("num-lines")
5.3自定义累加器
为了实现你自己的累加器,我们需要实现Accumulator接口,如果你想让你自定义的累加器需要被Flink所收录,请创建一个提交请求。可以选择实现Accumulator或者SimpleAccumulator。
1.Accumulator<V, R>是最灵活的:它定义了需要进行累加的值的类型V以及最后结果的类型R,例如:对于一个histogram,v是数值类型的而R是一个histogram。
2.SimpleAccumulator则是在进行累计数据类型和返回的数据类型一致的情况下使用的,例如计数器。