Flink DataStream编程指南及使用注意事项。
浪尖 浪尖聊大数据
Flink中的DataStream程序是对数据流进行转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。数据流的最初的源可以从各种来源(例如,消息队列,套接字流,文件)创建,并通过sink返回结果,例如可以将数据写入文件或标准输出。Flink程序以各种上下文运行,独立或嵌入其他程序中。执行可能发生在本地JVM或许多机器的集群上。
一,套接字流
下面举一个例子,该例子,数据来源是网络套接字,带窗口的流处理,窗口大小是5s,这些概念玩过spark Streaming应该都很清楚,我们后面也会给大家详细讲解。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Created by spark on 2017/8/31.
*/
object WindowWordCount {
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print
env.execute("Window Stream WordCount")
}
}
}
要测试该demo,首先要启动一个tcp server监听9999端口,这里可以使用netcat。
nc -lk 9999
这样就可以用nc统计,然后DataStream输出结果。根据不同的模式会有不同的输出位置,提交到集群的话,要去日志里查看。
该代码可以直接粘贴复制到你自己的工程,只需要导入Flink的相关依赖,具体工程构建方法,请参考<构建Flink工程及demo演示>。
二,数据转换
数据转换将一个或多个数据流转换成新的DataStream。 程序可以将多种转换组合成复杂的拓扑结构。
下面介绍常用的操作,然后举例讲解。
1,map操作
DataStream → DataStream
取出一个元素,产生一个元素。比如,使用map将数据元素乘以2.
dataStream.map { x => x * 2 }
2,FlatMap
DataStream → DataStream
取出一个元素,产生零个,一个,或者多个元素。比如,可以使用flatmap分割句子。
dataStream.flatMap { str => str.split(" ") }
3,Filter
DataStream → DataStream
每个函数都去执行一个布尔函数,并保留使得布尔函数返回为true的元素。
过滤零值的函数。
dataStream.filter { _ != 0 }
4,KeyBy
DataStream → KeyedStream
将流逻辑分区为不相交的分区,每个分区包含相同key的元素。在内部,这是用哈希分区来实现的。看上篇文章来了解如何指定一个key。
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
5,Reduce
KeyedStream → DataStream
滚动的聚合keyedStream.将当前元素和上一个聚合的元素进行合并,返回一个新的值。
比如,将相同key的值进行求和。
keyedStream.reduce { _ + _ }
6,Fold
KeyedStream → DataStream
滚动聚合一个KeyedStream,需要指定一个初始值。合并当前的值和前一次合并过(floded)的值,返回一个新值。
比如,下面的折叠函数,假如作用于一个序列(1,2,3,4,5),会返回一个序列:"start-1", "start-1-2", "start-1-2-3", ...
val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
7,Aggregations
KeyedStream → DataStream
KeyedStream 进行滚动聚合。min和minBy之间的区别在于min返回最小值,而minBy返回该字段中具有最小值的元素(对于max和maxBy相同)。
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
8,Window
KeyedStream → WindowedStream
作用于partitioned KeyedStreams.根据一些特性(如,最近五分钟的数据)进行按key分组。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
对于windows操作后面会出文章详细介绍。
9,WindowAll
DataStream → AllWindowedStream
可以在常规DataStreams上定义Windows。Windows根据一些特征(例如,在最近5秒内到达的数据)对所有流事件进行分组。
注意:
这在许多情况下是非并行的转换。所有记录将被收集到windowAll运算符的一个任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
10,Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将通用功能作为一个整体应用于窗口。 以下是手动对窗口元素求和的函数。如果您正在使用windowAll转换,则需要使用AllWindowFunction。
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
11,Window Reduce
WindowedStream → DataStream
对窗口应用Reduce,并返回Reduce的值。
windowedStream.reduce { _ + _ }
12,Window Fold
WindowedStream → DataStream
对窗口应用fold 功能并返回fold 后值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠到字符串“start-1-2-3-4-5”中:
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
13,Aggregations on windows
WindowedStream → DataStream
聚合窗口的内容。min和minBy之间的区别在于min返回最小值,而minBy返回该字段中具有最小值的元素(对于max和maxBy相同)。
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
14,Union
DataStream* → DataStream
两个或多个数据流的联合创建一个包含来自所有流的所有元素的新流。注意:如果将数据流与本身结合在一起,您将在结果流中获取每个元素两次。
dataStream.union(otherStream1, otherStream2, ...)
15,Window Join
DataStream,DataStream → DataStream
给定的key和公共窗口上对两个流进行join。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
16,Window CoGroup
DataStream,DataStream → DataStream
在给定的key和公共窗口上Cogroups两个数据流。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
17,Connect
DataStream,DataStream → ConnectedStreams
“Connects”两个数据流保留其类型,允许两个流之间的共享状态。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
18,CoMap, CoFlatMap
ConnectedStreams → DataStream
与connected data stream上的map和flatMap类似。
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
19,Split
DataStream → SplitStream
根据一些标准将流分成两个或更多个流。
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
20,Select
SplitStream → DataStream
从拆分流中选择一个或多个流。
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
21,Iterate
DataStream → IterativeStream → DataStream
通过将一个运算符的输出重定向到某个先前的运算符,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开始,并持续应用迭代。大于0的元素将发送回反馈通道,其余元素将在下游转发。后面会出完整代码的文章。
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
22,Extract Timestamps
DataStream → DataStream
从记录中提取时间戳,以便使用使用事件时间语义的窗口。后面会详细介绍事件时间,可以先看了解概念。
stream.assignTimestamps { timestampExtractor }
通过匿名模式匹配从元组,case class和集合中提取,如下所示:
val data: DataStream[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
}
并不支持API即开即用。为了使用这个特性,你需要使用scala API 扩展,这个后面出文章介绍。
三,物理分区(Physical partitioning)
Flink给出了流转换后对流分区精确控制的相关功能函数。
1,Custom partitioning
DataStream → DataStream
使用用户定义的分区器为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
2,Random partitioning
DataStream → DataStream
按照均匀分布随机分配元素。
dataStream.shuffle()
3,Rebalancing (Round-robin partitioning)
DataStream → DataStream
轮训的方式为每个分区创建相同的负载。在数据倾斜的情况下用于性能的优化。
dataStream.rebalance()
4,Broadcasting
DataStream → DataStream
将元素广播到每个分区。
dataStream.broadcast()
5,Rescaling(重新缩放)
DataStream → DataStream
分区元素,循环,到下游操作的一个子集。该方法对这种情况有用,比如,你想拥有pipeline,也即你想将数据源的每个实例的数据分散到mappers的子集中去,以实现均衡负载,但是又不想进行全部的再平(这种情况下,rebalance()可以使用)。这将仅需要本地数据传输,而不是通过网络传输数据,具体取决于其他配置值,例如TaskManager的slot数。
upstream 发送元件给downstream 操作的子集取决于upstream 和downstream 操作的并行程度。例如,如果upstream 操作具有并行性2,并且downstream 操作具有并行性4,则一个upstream 操作将将元素分配到两个downstream 操作,而另一个upstream 操作将分配到另外两个downstream 操作。另一方面,如果downstream操作具有并行性2,而upstream 操作具有并行性4,则两个upstream 操作将分配到一个downstream 操作,而另外两个upstream 操作将分配到其他downstream 操作。
在不同并行度不是彼此的倍数的情况下,一个或多个downstream 操作将具有来自upstream 操作的不同数量的输入。
dataStream.rescale()
四,任务链和资源组
链接两个连续的转换操作,意味着将它们运行在相同的线程中,借此提升整体性能。Flink默认情况下假如有可能的话会将操作组成链条,比如两个连续的map操作。如果需要,API可以对链接进行细粒度控制:调用StreamExecutionEnvironment.disableOperatorChaining() 如果你想禁止链式操作的话。请注意,这些函数只能在DataStream转换后才可以使用,因为它们是指先前的转换。例如,你可以使用someStream.map(...).startNewChain(),但是你不能用someStream.startNewChain()。
Flink中一个资源组就是一个slot。如果需要,您可以手动隔离独立slot中的操作符。
1,Start new chain
开始一个新的链,从这个操作算子开始。下面的例子, 两个map将被链接,并且filter不会链接到第一个map。
someStream.filter(...).map(...).startNewChain().map(...)
2,Disable chaining
不要链接map操作符
someStream.map(...).disableChaining()
3,Set slot sharing group
设置操作的slot共享组。Flink会将在想同slot共享组的操作放入同一个slot,同时将没有slot共享组的操作保持在其它的slot。这个可以用来隔离slot。如果所有输入操作都在相同的slot共享组中,则slot分配组将从输入操作继承。 默认slot共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)将操作明确地放入此组。
someStream.filter(...).slotSharingGroup("name")
五,数据源
数据源是Flink项目读取数据之处。可以使用StreamExecutionEnvironment.addSource(sourceFunction). 为你的项目添加一个数据源。Flink有一些预先写好的数据源函数,但是你往往需要实现SourceFunction 来实现一些非并发的数据源。也可以实现ParallelSourceFunction 接口或者继承RichParallelSourceFunction 来实现并行的数据源。
可以使用StreamExecutionEnvironment调用预先实现的数据源方法:
1,File-based
A),readTextFile(path) - 读取文本文件,即遵循TextInputFormat规范的文件,逐行读取并将其作为字符串返回。
B),readFile(fileInputFormat,path) - 按照指定的文件输入格式读取(一次)文件。
C),readFile(fileInputFormat,path,watchType,interval,pathFilter) - 这是前面两个函数内部调用的方法。 它根据给定的fileInputFormat读取路径中的文件。根据提供的watchType,此源可以定期监视(每个间隔ms)新数据的路径(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前在路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步排除一些不需要文件被处理。
实现:
在后台,Flink将文件读取过程分为两个子任务,即目录监控和数据读取。这些子任务中的每一个都由单独的实体实现。目录监控是通过单个非并行(parallelism = 1)任务实现的,而读取由并行运行的多个任务执行。后者的并行性等于job并行性。。单个目录监控任务的作用是扫描目录(根据watchType定期或只有一次),找到要处理的文件,将它们分割,并将这些拆分分配给下游reader。reader是读取实际数据的reader。每个拆分只能由一个reader读取,而reader可以逐个读取多个分片。
重要笔记:
如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则当文件被修改时,其内容将被重新处理。 这可以打破“一次”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。
如果watchType设置为FileProcessingMode.PROCESS_ONCE,则源扫描路径一次并退出,而不等待reader完成文件内容的读取。当然reader会继续读取,直到读取所有的文件内容。关闭源头之后,不会再有检查点。这可能导致节点故障后的恢复速度较慢,因为该作业将从最后一个检查点恢复读取。
2,Socket-based
从套接字读取。 元素可以由分隔符分隔。
3,Collection-based
A),fromCollection(Collection) 从Java Java.util.Collection创建数据流。 集合中的所有元素必须是相同的类型。
B),fromCollection(Iterator,Class) - 从迭代器创建数据流。 该类指定迭代器返回的元素的数据类型。
C),fromElements(T ...) - 从给定的对象序列创建数据流。 所有对象的类型必须相同。
D),fromParallelCollection(Splitable Iterator,Class) - 并行创建一个迭代器的数据流。 该类指定迭代器返回的元素的数据类型。
E),generateSequence(from,to) - 并行生成给定间隔中的数字序列。
4,Custom
addSource - 附加一个新的源函数。 例如,要从Apache Kafka中读取,您可以使用addSource(new FlinkKafkaConsumer08 <>(...))。
六,DataSinks
Data sinks 从DataStream中获取数据,并将它们写入文件,Socket,外部存储,或者打印出来。Flink也是提供了一下输出的格式。
1,writeAsText() / TextOutputFormat
将元素以行格式的字符串形式写入。 字符串通过调用每个元素的toString()方法获得。
2,writeAsCsv(...) / CsvOutputFormat
将元组写入逗号分隔的值文件。 行和字段分隔符是可配置的。 每个字段的值来自对象的toString()方法。
3,print() / printToErr()
在标准输出/标准错误流中打印每个元素的toString()值。可选地,可以提供前缀(msg),其被添加到输出。这样可以区分不同类型的打印输出。并行度大于一,输出也会包含任务台标识符。
4,writeUsingOutputFormat() / FileOutputFormat
自定义文件输出的方法和基类。 支持自定义对象到字节转换。
5,writeToSocket
根据SerializationSchema将元素写入套接字
6,addSink
调用自定义sink函数。 Flink与其他系统(如Apache Kafka)的connectors 捆绑在一起,实现sink功能。
七,Iterations
迭代流程序实现一个step function(步进函数)并将其嵌入到IterativeStream中。由于DataStream程序可能永远不会完成,因此没有最大迭代次数。相反,您需要指定流的哪一部分反馈到Iterations,哪个部分使用拆分转换或过滤器向downstream 转发。在这里,我们展示一个iteration 示例,其中主体(重复的计算的一部分)是简单的映射变换,反馈的元素和转发给downstream的元素有filter区分。
val iteratedStream = someDataStream.iterate(
iteration => {
val iterationBody = iteration.map(/* this is executed many times */)
(tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
})
默认情况下,反馈流的分区将自动设置为与迭代头的输入相同。 要覆盖此,用户可以在closeWith方法中设置一个可选的布尔标志。
例如,这里是从一系列整数连续减1的程序,直到它们达到零:
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(
iteration => {
val minusOne = iteration.map( v => v - 1)
val stillGreaterThanZero = minusOne.filter (_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
}
)
八,执行参数
StreamExecutionEnvironment包含ExecutionConfig,它允许为运行时设置作业特定的配置值。有关大多数参数的说明,请参阅执行配置。 这些参数与DataStream API有关:
A),enableTimestamps()/ disableTimestamps():将时间戳附加到从源发送的每个事件。areTimestampsEnabled()返回当前值。
B),setAutoWatermarkInterval(long milliseconds):设置automatic watermark emission(自动阈值发射)的间隔。 你可以用long getAutoWatermarkInterval()获取当前值。
九,Fault Tolerance
主要是将Flink的checkpoint。Checkpoint主要是表述如何使能和配置Flink的checkpoint机制,后面会出文章详细介绍。
十,控制延迟
默认情况下,元素不会逐个传输(这将导致不必要的网络流量)而是被缓存。可以在Flink配置文件中设置缓冲区的大小(实际上在机器之间传输)。虽然这种方法对于优化吞吐量是有好处的,但是当输入流不够快时,它可能会导致延迟问题。为了控制吞吐量和延迟,您可以在执行环境(或单个操作符)上使用env.setBufferTimeout(timeoutMillis)来设置缓冲区填满的最大等待时间。此后,缓冲区会自动发送,即使它们未满。 此超时的默认值为100 ms。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)
env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
为了最大化吞吐量,请设置setBufferTimeout(-1),这将删除超时,缓冲区只有在满时才会刷新。为了最小化延迟,将超时设置为接近0的值(例如5或10 ms)。应避免缓冲超时时间为0,因为它可能导致严重的性能下降。
十一,Debugging
在分布式集群中运行流程序之前,最好确保实现的算法可以按需要工作。 因此,实施数据分析程序通常是检查结果,调试和改进的增量过程。
Flink提供了通过支持IDE内的本地调试,输入测试数据和收集结果数据来显著简化数据分析程序开发过程的功能。本节给出一些提示如何缓解Flink程序的开发。
1,本地执行环境
LocalStreamEnvironment会在创建的同一个JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,可以在代码中设置断点并轻松调试程序。创建并使用LocalEnvironment,如下所示:
val env = StreamExecutionEnvironment.createLocalEnvironment()
val lines = env.addSource(/* some source */)
// build your program
env.execute()
2,集合数据源
Flink提供了由Java集合支持的特殊数据源,以简化测试。一旦程序测试通过,Source和sink可以容易地被从外部系统读取/写入的Source和sink替换。
集合数据源可以使用如下:
val env = StreamExecutionEnvironment.createLocalEnvironment()
// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)
// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)
注意:
目前,集合数据源要求数据类型和迭代器实现Serializable。 此外,集合数据源不能并行执行(parallelism = 1)。
3,Iterator Data Sink
Flink还提供了一个接收器来收集DataStream的测试和调试结果。 它可以使用如下:
import org.apache.flink.contrib.streaming.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter
val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala
kafka,hbase,spark等源码入门到深入,spark机器学习,大数据安全,大数据运维,请关注浪尖公众号,看高质量文章。