一.定义
Flink是一个分布式计算框架,可以处理海量数据,既可以离线批处理,也可以做实时流处理。主要是用于实时流处理。
flink实时流处理的优势可以归纳为三点:
①低延迟
②高吞吐
③支持精确一次
从上图,可以看出flink可以接受多种数据源数据,比如socket,file,Kafka数据源等,然后通过flink处理计算。此外,flink也支持和主流的资源调度框架的整合,比如k8s,yarn,mesos,使得对整个集群的资源调度更为合理。最后flink可以将结果存到其他的应用系统中,比如hbase,mysql,es等
二.flink的技术亮点
1.在目前所有的实时流计算框架中,flink是唯一一个同时支持低延迟、高吞吐、精确一次语义的实时流计算框架
2.flink既支持事件事件(Event Time),也支持处理时间(Process Time)之前的实时流计算框架都是基于Process Time来处理数据的。Process Time可以理解为:以数据到达计算框架被处理的时间戳为Process Time。
有些时候,可能由于网络波动、网络故障或其他的服务器故障导致实际发送的数据的顺序和实际到达的顺序不一样,导致最终结果不正确性。
Flink可以基于事件时间来进行处理
3.Flink支持状态编程
flink允许用户在实时流计算过程中,获取计算的中间状态,从而实现比较复杂的业务场景。
4.Flink支持精确一次语义
flink通过轻量级分布式快照机制,能够确保数据不丢失,并且能够精确处理
5.Flink实现了独立的内存管理机制
可以说Flink是一种内存计算框架,Flink实现了独立的内存管理机制,减少了JVMGC的依赖,从而降低了JVMGC可能带来的负面影响。
三.Flink-Source
1.CollectSource
2.FileSource
3.SocketSource
4.KafkaSource(下面讲解)
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建参数对象
val properties = new Properties()
//设置kafka的集群列表
properties.setProperty("bootstrap.servers","kafka01:9092,kafka02:9092,kafka03:9092")
//设置zookeeper集群列表
properties.setProperty("zookeeper.connect","zk1:2181,zk2:2181,zk3:2181")
//①参:消费的主题名 ②指定消费的数据类型,普通文本类型③属性参数对象
//flink-kafkasource的并行度和主题的分区数量有关,比如主题一个分区,则并行度为1
val consumer=new FlinkKafkaConsumer011[String]("topicsc",new SimpleStringSchema(),properties)
val source = env.addSource(consumer)
env.execute()
source.print()
四.Flink API
Flink的模块可以分为三个模块:
1.DataSource模块:用于指定flink的数据源,使得flink可以获取
2.Transformation模块(Flink API):用于定义flink对数据的处理逻辑。
3.DataSink模块。用于指定flink的输出目的地,使得flink可以将计算结果输出到其他的应用系统中
Flink API可以分为两类:
1.DataStream API,用于实时流处理的API
2.Data Set,用于离线处理的API
Flink既可以做离线处理,也可以做实时处理,但flink最擅长的就是实时流处理.。
实时系列:
①map,
②flatmap,
③filter,
④keyBy(将Stream指定key并根据key的散列值进行分区),
⑤reduce,
⑥Aggregations(是DataStream结构提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果。其实是将Reduce算子中的函数进行了封装,封装的聚合操作有sum、min、max,这样就不需要用户自己定义Reduce函数。),
//数据源helle flink hello
//flink提供一些aggregations函数,比如sum,min,max,这些函数需要作用于keyedStream上
//keyedStream就是调用keyBy产生的
val result = source.flatMap(line=>line.split(" "))
.map(world=>(word,1))
.keyBy(0)//0表示元祖的下标0,本例中以单词key进行分组
//如不加入timeWindow实现对历史数据的累计处理
.timeWindow(Time.of(5,TimeUnit.SECONDS))//实现实时流单词频次统计-实现窗口操作。比如窗口=5s,类比于sparkStreaming的微批流处理形式
.reduce((a,b))//这个就相当于aggregations函数的sum
⑦union,
⑧split&select(func)
⑨connect
五.Flink的并发度(并行度)概念及设置使用
首先先明确Flink的Slot的概念
1.Slot槽位,初学时类比于Yarn中的Container容器概念。是对资源(内存+cpu核数)的一种限定和划分。
2.Slot是针对TaskManager而言的,默认情况下,每个TaskManager的Slot的数量=其CPU核数总和
3.Slot对应TaskManager的资源是均分策略
4.通过设置Flink的并行度,从而提高Flink的开发处理能力。可以认为Flink的并行度就是分区数。
//上图并行度代码
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度,不设置默认为1
env.setParallelism(2)
//从nc socket发送数据,数据源:如hello world hello
val source = env.socketTextStream("hadoop01",8888)
val result = source.flatMap(line=>line.split(" ")).map(word=>(word,1))
补充:如果在idea启动,默认并行度等于电脑核数
小结:可以将flink的并行度看做分区,并行度越高,分区数越多,则flink的并行效率越高
//上两张图并行度代码
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从nc socket发送数据,数据源:如hello world hello
val source = env.socketTextStream("hadoop01",8888)
val result = source.flatMap(line=>line.split(" ")).setParallelism(2).map(word=>(word,1)).setParallelism(3)
result.print().setParallelism(1)
Flink的并行度设置分为4个层面,算子层面(代码算子后面中写),执行环境层面(env.setParallelism(2)),客户端层面(提交任务页面),系统层面(配置文件中配置)–优先级从前往后,算子优先级最高
六.Flink的窗口计算
Flink的窗口计算,指的是window操作,需要注意的是:窗口计算必须是基于KeyedStream来操作的,即必须在keyBy之后进行窗口计算,在一个窗口内针对同一个key的数据进行处理。
Flink的窗口计算分为:
1.滚动窗口:窗口之间数据没有重叠部分,比如设定窗口的大小=5s
2.滑动窗口:需要指定一个滑动区间以及窗口大小,每个一段时间计算一个窗口数据。即窗口与窗口之间存在重叠数据。比如窗口大小=10s,滑动区间=5s
3.计数窗口,当时间key的数量达到指定计数操作的时候时,触发后续计算:当相同key的数量达到指定计数操作时,触发后续计算
//统计五秒内的wordcount
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从nc socket发送数据,数据源:如hello world hello
val source = env.socketTextStream("hadoop01",8888)
val wordcount = source.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
wordcount.print()
env.execute()
//每隔五秒统计10s窗口内的wordcount
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从nc socket发送数据,数据源:如hello world hello
val source = env.socketTextStream("hadoop01",8888)
val wordcount = source.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
//①参:窗口大小②参:滑动区间大小
.timeWindow(Time.seconds(10),Time.seconds(5))
.sum(1)
wordcount.print()
env.execute()
//每隔五秒统计10s窗口内的wordcount
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从nc socket发送数据,数据源:如hello world hello
val source = env.socketTextStream("hadoop01",8888)
val wordcount = source.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
//达到指定4计数操作时,触发计算
.countWindow(4)
.sum(1)
wordcount.print()
env.execute()
如果想进入自定义处理逻辑,需要使用ProcessWindowFunction。
模拟场景:对每个窗口内的数据,对相同key的数据做升序
//每隔五秒统计10s窗口内的wordcount
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从nc socket发送数据,数据源:如hello world hello
val source = env.socketTextStream("hadoop01",8888)
val result= source.map(line=>line.split(","))
.map(arr=>(arr(0),arr(1).toInt))
.keyBy(_._1)//如果后续使用自定义的WindowProcess,keyBy使用元祖._位置指定key
.timeWindow(Time.seconds(10))
.process(new MyWindowProcess )
result.print()
env.execute()
/*
泛型①:输入类型
泛型②:输出类型
泛型③:key类型
泛型④:窗口类型
导包时注意:org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
*/
class MyWindowProcess extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
override def process(key:String, context:Context, elements: Iterable[(String,Int)], out:Collecter[String]):
println("key:" + key + "排序后的集合:" + elements.toList.sortBy(x=>x._2>))
//out.collect输出数据
out.collect("")
}
输出结果:
七.Flink Sink
支持(代码百度中有)
1.FileSink
2.KafkaSink
3.HBaseSink
4.JDBC Sink
5.ES Sink
八.Flink的EventTime时间时间与WaterMark(水位线)
Flink支持三种时间语义,分别是:
1.Event Time,事件时间,即数据产生的时间
2.Ingestion Time,接收时间,即数据被FlinkSource组件接收的时间
3.Processing Time,处理时间,即数据被Flink API处理的时间
Flink在流处理应用中,EventTime和Processing Time较多,ingestion Time属于折中方案,用的较少,不做讨论。
综上我们会发现,使用Process Time潜在的问题是:处理的顺序和实际发生的顺序可能是不一致的,导致最终结果是不正确。又或者说,在上述场景中,我们希望等一等迟到的数据。
如果要想处理迟到的数据的乱序问题,就需要使用EventTime和Watermark机制。
EventTime和Watermark机制,需要:
1.设定时间语义:env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2.设置一个最大的延迟时间
3.需要结合使用窗口函数
4.产生的数据中需要带有时间戳
Watermark=当前最大的事件时间-设定的最大延迟时间
Watermark称为水位线,如果Watermark越过了窗口对应的结束时间窗口才会关闭和进行计算
综上,我们可以发现使用EventTime和Watermark可以解决迟到数据以及乱序的问题。但不一定能够百分百完美解决。就比如4在6时间戳后面,则4进入不到其对应的窗口,被丢弃处理。
所以使用eventTime和Watermark,使得对迟到数据以及乱序问题的处理提供了可能。
当然也需要衡量延迟与准确性,延迟越大,准确性越高。具体设定为多大的延迟,没有一个固定的标准。
/*
数据源单词统计
hello,1645844000,1
hello,1645845000,2
hello,1645846000,1
hello,1645847000,3
hello,1645848000,1
hello,1645849000,4
hello,1645850000,1
hello,1645851000,5
*/
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = env.socketTextStream("ip01",8888)
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String]){
var maxEventTime = 0L
var watermark = 0L
//设置最大延迟时间1s,注意:1000表示毫秒值
val maxDelay = 1000L
//计算当前的watermark,并返回
override def getcurrentWatermark:Watermark ={
watermark = maxEventTime-maxDelay
new Watermark(watermark)
}
override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
//获取当前记录的时间戳
val eventTime = element.split(",")(1).toLong
//计算当前最大的事件时间
maxEventTime = Math.max(eventTime.maxEventTime)
println("当前事件戳:"+eventTime+"上一次的watermark:"+watermark)
//返回当前记录事件
eventTime
}
})
val result = source.map(line=>line.split(","))
.map(arr=>(arr(0),arr(1).toLong,arr(2).toInt))
.keyBy(0).timeWindow(Time.second(3)).sum(2)
result.print()
env.execute()
}
/*
控制台打印
当前事件戳1645844000上一次的watermark-1000
当前事件戳1645845000上一次的watermark1645844000
当前事件戳1645846000上一次的watermark1645845000
当前事件戳1645846000上一次的watermark1645845000
(hello,1645844000,4)【4=1+2+1统计单词数】
*/
另一编码方式,封装了事件时间和watermark
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = env.socketTextStream("ip01",8888)
//BoundedOutOfOrdernessTimestampExtractor[输入泛型类型](设定最大的超时时间)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(1)){
override def extractTimestamp(element: String): Long = element.split(",")(1).toLong
})
val result = source.map(line=>line.split(","))
.map(arr=>(arr(0),arr(1).toLong,arr(2).toInt))
.keyBy(0).timeWindow(Time.second(3)).sum(2)
result.print()
env.execute()
}
九.底层API(ProcessFuntion)
FlinkAPI可以分为三层。
1)低级API(底层)API:提供了对时间和状态的细粒度控制,如果想要获取底层的数据时间戳(处理时间或事件时间),获取watermark,实现侧输出流以及状态编程,则需要用到低级API,可以说底层API能够做很多复杂的工作。
2)核心API:对底层API进一步封装,主要对流和批数据处理。简单易用。
3)高级API:再进一步封装,通过sql的方式对flink进行。不作为重点,了解即可。
ProcessWindowFunction->在窗口函数操作后使用。
比如:source…key.timewindow.process(ProcessWindowFuntion)
ProcessFunction
比如:source…map/flatMap/filter.process(ProcessFunction)
KeyedProcessFunction
比如:source…keyBy.process(KeyedProcessFunction)
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//tom,m.23
val source = env.socketTextStream("ip01",8888)
.map(line)=>line.split(","))
//自己创建一个Student类
.map(arr=Student(arr(0),arr(1),arr(2).toInt))
.process(new MySplitProcess)
//对应输出等于18岁的侧输出流
val outputTag01 = new OutputTag[String]("stream_1")
//对应输出小于18岁的侧输出流
val outputTag02 = new OutputTag[String]("stream_2")
val sideStream01 = source.getSideOutput(outputTag01)
val sideStream02 = source.getSideOutput(outputTag02)
source.print("主流--")
sideStream01 .print("侧输出流01--")
sideStream02 .print("测输出流02--")
env.execute()
}
//ProcessFunction[输入类型,主流的输出类型]
class MySplitProcess extends ProcessFunntion[Student,Student]{
/*
value:接收当前传入的流数据
ctx:其中的一个功能是:可以输出侧输出流
out:用于输出主流数据
*/
override def processElement(value: Student,ctx: ProcessFunction[Student, Student]#Context,out: Collector[Student]):Unit={
if(value.age>18){
//如果年龄大于18岁,输出到主流
out.collect(value)
}else if(value.age=18){
//对应输出等于18岁的侧输出流
ctx.output(new OutputTag[String]("stream_1"),value.name+":"+value.age)
}else{
//对应输出小于18岁的侧输出流
ctx.output(new OutputTag[String]("stream_2"),value.name)
}
}
}
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//tom,m.23
val source = env.socketTextStream("ip01",8888)
.map(line)=>line.split(","))
//自己创建一个Student类
.map(arr=Student(arr(0),arr(1),arr(2).toInt))
.keyBy("name")//如果keyBy操作的DStream[case class,Student(name,gender,age)],可以直接写成员名
.process(new MyKeyedProcess01)
source.print()
env.execute()
}
/*
keyedProcessFunction[key类型,输入类型,主流输出类型]
key类型固定,必须为Tuple。导包:org.apache.flink.api.java.tuple.Tuple
key为被封装为元祖形式,比如:key为:(tom)
如果想要取元祖的key,需要:val key:String = ctx.getCurrentKey.getField(0)
注意:必须显示指定key类型。
*/
class MyKeyedProcess01 extends keyedProcessFunntion[Tuple,Student,String]{
/*
如果在keyBy后,调用keyedProcessFunction,ctx可以获取当前的key
*/
override def processElement(value: Student,ctx: keyedProcessFunction[Tuple, Student, String]#Context,out: Collector[Student]):Unit={
val key:String = ctx.getCurrentKey.getField(0)
print("key为:" + key)
out.collect("hello")
}
}
def main(arg: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//tom,m.23
val source = env.socketTextStream("ip01",8888)
.map(word=>(word,1))
.keyBy(0)
.process(new MyKeyedProcess02)
source.print()
env.execute()
}
/*
countState.value()获取当前状态里的值
countState.updata("")更新当前状态的值
注意事项:状态对象必须是lazy,确保产生RuntimeContext后再初始化状态对象的。
lazy懒值是真正被调用时才会初始化,而此时RuntimeContext一定是已经生成了的
*/
class MyKeyedProcess02 extends keyedProcessFunntion[Tuple,(String,Int),String]{
lazy val countState = getRuntimeContext.getState(new ValueStateDescriptor[String]("count"),classOf[String]))
override def processElement(value: (String,Int), ctx: keyedProcessFunction[Tuple, (String,Int), String]#Context,out: Collector[String]):Unit = {
if(countStata.value() = null){
//因为最初,状态中肯定是什么都没有的,所以需要给其一个默认值
countState.updata("")
}
//获取key
val key:String=ctx.getCurrentKey.getField(0)
val result = countState.value()+"-"+value_2
countState.updata(result)
//第一次:key是hello result是-1.主流输出的是:hello-1
out.collect(key+result)
}
}
十.Flink的状态编程
上述第二个例子以及涉及状态编程了。
Flink支持有状态的流计算和无状态的流计算
Flink的状态总体来说可以分为两种:
1.算子状态(Operator State)
算子的状态可以理解为直接在map,flatmap,filter这些转换类型算子上使用的状态,算子状态的作用范围是算子任务。
2.键控状态(keyed State)最常用的
可以在keyBy后使用的状态、键控状态的作用范围是每个key维护一个状态,不同key的状态互相之间不共享的。
使用Flink的状态编程:
source…keyBy.process(ProcessFunction)–如果想获取时间戳、定时器、watermark以及状态编程可以使用ProcessFunction
source…keyBy.map(RichMapFunction–如果不需要获取时间戳、定时器等,仅是做时间状态编程,则可以使用对应算子的富函数来实现
source…keyBy.flatMap(RichFlatMapFuntion)
source…keyBy.filter(RichFliterFunction)
/*
学习RichMapFuntion
首先明确一点:map映射方法,一个最根本的特点是:接受一条数据,输出一条数据。
即如果还要实现:接受一条数据,输出多条数据,map和RichMap都无法实现。
实现场景:每次输入一个数字
18 valueState.updata(18)
20 20-valueState.value(18)=2 valueState.updata(20)
25 25-valueState.value(20)=5 valueState.updata(25)
28
输出的是当前数字和上一个数字只差比如:
18
2
5
3
*/
object MapDriver{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("ip01",8888)
val result = source.map(num=>"key",num.toInt)).keyBy(0).map(new MyMapFuncWithState )
result.print()
env.execute()
}
}
class MyMapFuncWithState extends RichMapFunction[(String,Int),Int]{
lazy val numState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("num",classOf[Int]))
override def map(value: (String, Int)): Int = {
if(numState.value()==0){
//第一条数据来时
numState.updata(value._2)
//map方法输出
value._2
}else{
val result = value._2-numState.value()
//当前的数字更新到状态,便于下一次计算
numState.updata(value._2)
//map方法输出的本次数字和上一次数字之差
result
}
}
}
/*
学习RichFlatMapFuntion完成状态编程。
实现的场景,输入一条数据,输出多条数据
输入:
18
20
输出
初始值:0
本次数据:18
计算之差:18
上一次数据:18
本次数据:20
计算之差:2
*/
object FlatMapDriver{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("ip01",8888)
val result = source.map(num=>"key",num.toInt)).keyBy(0).flatMap(new MyFlatMapFuncWithState )
result.print()
env.execute()
}
}
class MyFlatMapFuncWithState extends RichMapFunction[(String,Int),String]{
lazy val numState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("num",classOf[Int]))
override def flatMap(value: (String, Int), out: Collector[String]): Unit = {
if(numState.value()==0){
out.collect("初始数据:"+0)
out.collect("本次数据:"+value._2)
out.collect("初始数据:"+value._2)
numState.updata(value._2)
}else{
out.collect("上一次数据:"+numState.value())
out.collect("本次数据:"+value._2)
numState.updata(value._2)
val result = value._2-numState.value()
out.collect("计算之差:"+result)
}
}
}
/*
实现的场景:每次输入数据。如果本次数据与历史所有数据(包含当前数据)均值大于5,则输出,否则过滤掉
主要是用一下ListState(列表状态),因为之前使用的都是ValueState(值状态)
输入
18
20--->当前历史所有数据均值是19
25--->当前历史所有数据均值是21
36--->当前历史所有数据均值是25
输出
18
36
*/
object FilterDriver{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("ip01",8888)
val result = source.map(num=>"key",num.toInt)).keyBy(0).filter(new MyFliterMapFuncWithState)
result.print()
env.execute()
}
}
class MyFliterMapFuncWithState extends RichFilterFunction[(String,Int)]{
lazy val numState = getRuntimeContext.getListState(new ListStateDescriptor[Int]("num",classOf[Int]))
//返回值是boolean类型
override def filter(value: (String, Int)): Boolean = {
if(!nums.get().iterator().hasNext){
//如果ListState里的数据为空,说明是最开始的状态
nums.add(value._2)
true
}else{
nums.add(value._2)
var sum = 0
var count = 0
val numsIt= nums.get().iterator()
while(numsIt.hasNext){
sum = sum.numsIt.next()
count = count+1
}
val avg = sumn/count
println("当前历史数据的均值为:"+avg)
val flag = if((value._2-avg).abs)true else false
flag
}
}
}
十一.Flink的数据一致性保障(通过轻量级分布式快照实现)
在实时流处理系统中,数据一致性保障可以分为三种:
1.at most once 至多一次,实际上没有可靠性的委婉说法,即数据存在丢失的可能
2.at least once 至少一次,可以确保数据丢失,但可能会重复处理从而导致结果的不正确
3.exectly once精确一次,可以确保数据不丢失,并且精确处理,保证最终结果的正确性
曾经,at least once很流行,典型的代表框架Storm,Samza,底层可以保障at least once。即能够确保数据不丢失,但不能保障数据不重复处理,如果用户要实现精确一次予以,需要自己实现(比如通过幂等性操作)
后来,StormTrident和sparkStreaming框架出现了,地城支持了精确一次语义。但也付出了一定代价,都是以批的处理机制来处理流i数据,即在批的层面上实现了精确一次的语义,即一批数据要么全都成功,要么全部失败。这么做提升了吞吐量,但关键是牺牲了低延迟的特性。
目前Flink可以说是业界内唯一同时支持:高吞吐、低延迟、精确一次语义的实时流框架
轻量级分布式快照:举一个游戏
游戏过程与技术的参照
①珠子:实时的数据
②数珠子过程:实时流数据处理的过程
③必须数准:精确一次语义
④病人:实时流计算框架
⑤脑补疾病:实时流计算框架工作时可能突发的各种异常状态
⑥助手以及系皮筋一系列措施:flink的轻量级分布式快照机制
Flink的轻量级分布式快照机制:
①需要设定checkpoint(看做是哪个皮筋,在技术中成为快照)周期,比如3s
②需要设定checkpoint.dir,快照的存储位置。flink支持将快照存到内存中(测试环境)或文件系统中(生产环境)
chk一般为检查点屏障,可以看做是一条特殊的记录,会触发flink checkpoint相关的操作
flink的快照机制是属于异步处理机制,性能较高,此外当新快照生产后,老快照会被覆盖。所以快照资源不会占用很多。
object Driver{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置checkPointing周期
env.enableCheckpointing(3000)
//指定checkpoint最大并行度
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//指定数据可靠性保障为精确一次语义
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//flink的状态管理器:①.MemoryStateBackend ②.FsStateBackend③.RocksDBStateBacnkend
//1.默认的是MemoryStateBackend。将快照存在内存中,好处是恢复时从内存恢复速度快。缺点不可靠,快照状态可能会丢失。此外在生产环境下可能会造成内存溢出风险
//2.所以在生产环境下可以FsStateBackend和RocksDBStateBacnkend(以序列化形式存储,额外导包),将快照存到文件
env.setStateBackend(new RocksDBStateBackend("hdfs地址"))
//hello world hello
//如果要想让flink实现真正意义上的精确一次,还需要确保端到端的数据一致性。所以需要数据源是Kafka
val source = env.socketTextStream("ip01",8888)
val wordcount = source.flatMap(line=>line.split(" ")).map(word=>(word,1)).keyBy(0).sum(1)
wordcount.print()
env.execute()
}
}
注意:如果要想让flink实现真正意义上的精确一次,还需要确保端到端的数据一致性。所以在生产环境下,flink的数据源基本是从kafka过来。因为kafka是否被消费数据都会存在,chk会记录它的位置,flink和kafka是完美的配合