1、Transform
1.1 map
val streamMap = stream.map { x => x * 2 }
1.2 flatmap
flatMap的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
例如: flatMap(List(1,2,3))(i ⇒ List(i,i))
结果是List(1,1,2,2,3,3)
而List("a b", "c d").flatMap(line ⇒ line.split(" "))
结果是List(a, b, c, d)
val streamFlatMap = stream.flatMap{ x => x.split(" ") }
1.3 filter
val streamFilter = stream.filter{ x => x == 1 }
1.4 keyby
DataStream→ KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
val streamKeyby = stream.keyBy(0)
1.5 滚动聚合算子(Rolling Aggregation)
这些算子可以针对KeyedStream的每一个支流做聚合。
- sum()
- min()
- max()
- minBy()
- maxBy()
1.6 reduce
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataDS: DataStream[String] = env.readTextFile("input/data.txt") val ds: DataStream[WaterSensor] = dataDS.map( s => { val datas = s.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble) } ).keyBy(0).reduce( (s1, s2) => { println(s"${s1.vc} <==> ${s2.vc}") WaterSensor(s1.id, s1.ts, math.max(s1.vc, s2.vc)) } ) ds.print() env.execute("sensor")
1.7 split & select
Split算子
DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。
val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } )
Select算子
SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。
val even = split select("even") val odd = split select("odd") val all = split.select("even","odd")
1.8 Connect & CoMap
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream)
connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false )
1.9 Union
DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。
dataStream.union(otherStream1, otherStream2, ...)
Connect与 Union 区别:
- Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
- Connect只能操作两个流,Union可以操作多个。
2、支持的数据类型
Flink 会尽力推断有关数据类型的大量信息,这些数据会在分布式计算期间被网络交换或存储。 可以把它想象成一个推断表结构的数据库。在大多数情况下,Flink 可以依赖自身透明的推断出所有需要的类型信息。 掌握这些类型信息可以帮助 Flink 实现很多意想不到的特性:
- 对于使用 POJOs 类型的数据,可以通过指定字段名(比如
dataSet.keyBy("username")
)进行 grouping 、joining、aggregating 操作。 类型信息可以帮助 Flink 在运行前做一些拼写错误以及类型兼容方面的检查,而不是等到运行时才暴露这些问题。 - Flink 对数据类型了解的越多,序列化和数据布局方案就越好。 这对 Flink 中的内存使用范式尤为重要(可以尽可能处理堆上或者堆外的序列化数据并且使序列化操作很廉价)。
- 最后,它还使用户在大多数情况下免于担心序列化框架以及类型注册。
通常在应用运行之前的阶段 (pre-flight phase),需要数据的类型信息 - 也就是在程序对 DataStream
或者 DataSet
的操作调用之后,在 execute()
、print()
、count()
、collect()
调用之前。
Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。
2.0 Flink 的 TypeInformation 类
类 TypeInformation 是所有类型描述符的基类。该类表示类型的基本属性,并且可以生成序列化器,在一些特殊情况下可以生成类型的比较器。 (请注意,Flink 中的比较器不仅仅是定义顺序 - 它们是处理键的基础工具)
Flink 内部对类型做了如下区分:
-
基础类型:所有的 Java 主类型(primitive)以及他们的包装类,再加上
void
、String
、Date
、BigDecimal
以及BigInteger
。 - 主类型数组(primitive array)以及对象数组
-
复合类型
- Flink 中的 Java 元组 (Tuples) (元组是 Flink Java API 的一部分):最多支持25个字段,null 是不支持的。
- Scala 中的 case classes (包括 Scala 元组):null 是不支持的。
- Row:具有任意数量字段的元组并且支持 null 字段。。
- POJOs: 遵循某种类似 bean 模式的类。
- 辅助类型 (Option、Either、Lists、Maps 等)
- 泛型类型:这些不是由 Flink 本身序列化的,而是由 Kryo 序列化的。
POJOs 是特别有趣的,因为他们支持复杂类型的创建以及在键的定义中直接使用字段名: dataSet.join(another).where("name").equalTo("personName")
它们对运行时也是透明的,并且可以由 Flink 非常高效地处理。
TypeInformation 支持以下几种类型:
- BasicTypeInfo: 任意Java 基本类型或 String 类型
- BasicArrayTypeInfo: 任意Java基本类型数组或 String 数组
- WritableTypeInfo: 任意 Hadoop Writable 接口的实现类
- TupleTypeInfo: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现
- CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
- PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法
- GenericTypeInfo: 任意无法匹配之前几种类型的类
针对前六种类型数据集,Flink皆可以自动生成对应的TypeSerializer,能非常高效地对数据集进行序列化和反序列化。
2.1 基础数据类型
Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …?
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L) numbers.map( n => n + 1 )
2.2 Java和Scala元组(Tuples)
val persons: DataStream[(String, Integer)] = env.fromElements( ("Adam", 17), ("Sarah", 23) ) persons.filter(p => p._2 > 18)
2.3 Scala样例类(case classes)
case class Person(name: String, age: Int) val persons: DataStream[Person] = env.fromElements( Person("Adam", 17), Person("Sarah", 23) ) persons.filter(p => p.age > 18)
2.4 Java简单对象(POJOs)
public class Person { public String name; public int age; public Person() {} public Person(String name, int age) { this.name = name; this.age = age; } } DataStream<Person> persons = env.fromElements( new Person("Alex", 42), new Person("Wendy", 23));
2.5 其它(Arrays, Lists, Maps, Enums, 等等)
Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。
3、实现UDF函数----更细粒度的控制流
Flink在使用各种不同算子的同时,为了能更细粒度的控制数据和操作数据,给开发者提供了对现有函数进行扩展的能力
3.1 函数类(Function Classes)
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
自定义函数类实现MapFunction接口:
def main(args: Array[String]): Unit = { // TODO 从文件中获取数据源 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; env.setParallelism(1) val list = List( WaterSensor("sensor_1", 150000L, 25), WaterSensor("sensor_1", 150001L, 27), WaterSensor("sensor_1", 150005L, 30), WaterSensor("sensor_1", 150007L, 40) ) val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list) // UDF函数:自定义函数进行数据的处理 //waterSensorDS.map(ws=>(ws.id, ws.vc)) // 也可以使用函数类来代替匿名函数 val mapFunctionDS: DataStream[(String, Int)] = waterSensorDS.map( new MyMapFunction ) mapFunctionDS.print("mapfun>>>") env.execute() } // 自定义UDF函数。来实现映射转换功能 // 1. 继承MapFunction // 2. 重写方法 class MyMapFunction extends MapFunction[WaterSensor, (String, Int)]{ override def map(ws: WaterSensor): (String, Int) = { (ws.id, ws.vc) } }
3.2 匿名函数(Lambda Functions)
val tweets: DataStream[String] = ... val flinkTweets = tweets.filter(_.contains("flink"))
3.3 富函数(Rich Functions)
“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- ...
Rich Function有一个生命周期的概念。典型的生命周期方法有:
-
open()
方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。 -
close()
方法是生命周期中的最后一个调用的方法,做一些清理工作。 -
getRuntimeContext()
方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
def main(args: Array[String]): Unit = { // TODO 从文件中获取数据源 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; env.setParallelism(1) val list = List( WaterSensor("sensor_1", 150000L, 25), WaterSensor("sensor_1", 150001L, 27), WaterSensor("sensor_1", 150005L, 30), WaterSensor("sensor_1", 150007L, 40) ) val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list) // UDF函数:自定义函数进行数据的处理 //waterSensorDS.map(ws=>(ws.id, ws.vc)) // 也可以使用函数类来代替匿名函数 val mapFunctionDS: DataStream[(String, Int)] = waterSensorDS.map( new MyMapRichFunction ) mapFunctionDS.print("mapfun>>>") env.execute() } // 自定义UDF 富函数。来实现映射转换功能 // 1. 继承RichMapFunction // 2. 重写方法 class MyMapRichFunction extends RichMapFunction[WaterSensor, (String, Int)] { override def open(parameters: Configuration): Unit = super.open(parameters) override def map(ws: WaterSensor): (String, Int) = { //getRuntimeContext. (ws.id, getRuntimeContext.getIndexOfThisSubtask) } override def close(): Unit = super.close() }