1、基本概念(了解)
①流(Streaming):
是一种数据传送技术,它把客户机收到的数据变成一个稳定连续的流,源源不断地送出,使用户听到的声音或看到的图象十分平稳,
而且用户在整个文件送完之前就可以开始在屏幕上浏览文件。
②常见的流式计算框架
Apache Storm
Spark Streaming
Apache Flink
上述三种实时计算系统都是开源的分布式系统,具有低延迟、可扩展和容错性诸多优点,它们的共同特色在于:
允许你在运行数据流代码时,将任务分配到一系列具有容错能力的计算机上并行运行。
此外,它们都提供了简单的API来简化底层实现的复杂程度。
③Spark Streaming:
Spark Streaming 其实是构建在spark core之上的一个应用程序,支持实时数据流的处理。
实时处理大致流程:
输入—–计算—–输出
1. 输入:可以从Kafka,Flume,HDFS等获取数据
2. 计算:我们可以通过map,reduce,join等一系列算子通过spark计算引擎进行计算(基本和RDD一样,使用起来更方便。)
3. 输出:可以输出到HDFS,数据库,HBase等。
特点:
高可扩展性:可以运行在上百台机器上
低延迟:可以在秒级别上对数据进行处理
高可容错性
能够集成并行计算程序
2、Spark Streaming工作原理
①基础概念理解:
1. 离散流:是我们处理的一个实时数据流,在spark streaming中对应一个DStream,DStream本质是一组时间序列上连续的RDD;
2. 时间片(批处理时间间隔 Batch Interval):这是人为地对流数据进行定量的标准,以时间片作为我们拆分流数据的依据。一个时间片的数据对应一个RDD实例。
注:通过时间片设置的时间将连续的流数据变化为一批一批的数据,这个过程叫批处理
3. 窗口长度(windowDuration):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数。
注:比如说要每隔5分钟统计过去30分钟的数据,窗口长度为30分钟,30分钟是batch interval (批处理时间间隔)的倍数;
4. 滑动时间间隔(slideDuration):前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数。
注:比如说要每隔10分钟统计过去30分钟的数据,窗口时间间隔为10分钟;batch interval (批处理时间间隔)的倍数
5. input DStream :一个inputDStream是一个特殊的DStream 将spark streaming连接到一个外部数据源来读取数据。
6. Receiver :长时间(可能7*24小时)运行在Excutor之上,每个Receiver负责一个inuptDStream (比如读取一个kafka消息的输入流)。
7. JobGenerator: 主要是从DStream产生job, 且根据指定时间执行checkpoint. 他维护了一个定时器,该定时器在批处理时间到来的时候会进行生成作业的操作。
8. JobScheduler: 主要用于调度job。JobScheduler主要通过JobGenerator产生job,并且通过ReceiverTracker管理流数据接收器Receiver
注:JobGenerator每隔一段时间从DStream中产生job,且根据时间执行checkpoint!!!!
注:设置时间片(批处理时间间隔)为5分钟,那么流数据,每五分钟对应一个RDD实例! 每隔5分钟统计过去30分钟的数据,那么每隔五分钟处理6个RDD,也就是一个DStream
注:DStream本质是一组时间序列上连续的RDD。每个RDD都包含了自己特定时间间隔内的数据流
②工作流程(根据代码)
1、客户端提交作业后启动Driver,Driver是Spark作业的Master。
2、Receiver接收数据后生成Block,并把BlockId汇报给Driver,然后备份到另外一个Executor上。 //未开启wal机制,使用内存备份,但是只备份1份
3、Driver定时启动JobGenerator,根据DStream生成逻辑RDD,然后创建Jobset,交给JobScheduler。 //将DStream转化为RDD,然后创建Jobset
4、JobScheduler负责调度Jobset,交给DAGScheduler,DAGScheduler根据逻辑RDD,生成相应的Stages,每个stage包含一到多个task。
5、TaskScheduler负责把task调度到Executor上,并维护task的运行状态。
注:每个Spark Streaming至少包含一个receiver task。 //receiver task不在Driver节点上,是RM给任务分配的一个剩余资源多的节点
注:每个Receiver负责一个input DStream(连接一个外部数据源来读取数据),具体的数据如果未开启wal,默认保存在内存中,但会内存备份
③工作流程(根据逻辑)
Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的
输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD,
然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中(比如shuffle前的数据落地)
3、DStream操作
①普通转换 map、flatMap、filter repartition union、join(otherStream, [numTasks]) count、reduce countByValue、reduceByKey(func, [numTasks]) cogroup(otherStream,[numTasks]) //两DStream分别为(K,V)和(K,W)对,返回(K,(Seq[V],Seq[W])对新DStreams transform (func) //对原DStream的每个RDD做func操作, updateStateByKey(func) //这个方法可以被用来维持每个键的任何状态数据。就是维持每个job的k,v值,然后可以进行处理,可以用于全局key计数 注:https://blog.csdn.net/qq_21383435/article/details/80573725 (transform过滤黑名单实例)
注:https://blog.csdn.net/lmb09122508/article/details/80537881 (updateStateByKey全局计数) 注:UpdateStateByKey:统计全局的key的状态,但是就算没有数据输入,他也会在每一个批次的时候返回之前的key的状态。
假设5s产生一个批次的数据,那么5s的时候就会更新一次的key的值,然后返回。
这样的缺点就是,如果数据量太大的话,而且我们需要checkpoint数据,这样会占用较大的存储。 MapWithState:也是用于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,有一点增量的感觉。
这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。
这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储。 ②窗口转换 window (windowLength, slideInterval) //返回一个基于参数slideInterval * windowLength 计算后得到新的DStream。 countByWindow (windowLength,slideInterval) //返回基于滑动窗口的DStream中的元素的数量 reduceByWindow (func, windowLength,slideInterval) //基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。 reduceByKeyAndWindow (func,windowLength, slideInterval, [numTasks]) reduceByKeyAndWindow (func, invFunc,windowLength, slideInterval, [numTasks]) countByValueAndWindow (windowLength,slideInterval, [numTasks]) ③输出操作 print() //在Driver中打印出DStream中数据的前10个元素 saveAsTextFiles saveAsObjectFiles saveAsHadoopFiles foreachRDD(func) //遍历每个RDD 注:无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD
注:有状态的转化操作updateStateByKey、窗口转换
4、Spark Streaming的容错和数据无丢失机制(WAL机制 <------>预写文件机制)
背景:对于一些文件的数据源,driver的恢复机制可以保证数据无丢失,因为所有的数据都保存在HDFS或S3上面,因为节点宕机可以在现有的数据上重新计算。
对于一些像kafka,flume等数据源,接收的数据保存在内存中将有可能丢失,这是因为spark应用分布式运行的,如果driver进程挂了,所有的executor进程将团灭,
保存在这些进程所持有内存中的数据将会丢失
注:Spark也有内存备份,所以不是整个集群挂掉的话(driver宕机),内存中的数据也比较难丢失。不开启wal的话,数据保存在内存中!
wal:
WAL使用在文件系统和数据库中用于数据操作的持久性,先把数据写到一个持久化的日志中,然后对数据做操作,如果操作过程中系统挂了,
恢复的时候可以重新读取日志文件再次进行操作
对于像kafka和flume这些使用接收器来接收数据的数据源。接收器作为一个长时间的任务运行在executor中,负责从数据源接收数据,如果数据源支持的话,
向数据源确认接收到数据,然后把数据存储在executor的内存中,然后driver在exector上运行任务处理这些数据。(数据越大,则对内存要求越高)
如果wal启用了,所有接收到的数据会保存到一个日志文件中去(HDFS), 这样保存接收数据的持久性,此外,如果只有在数据 写入到log中之后接收器才向数据源确认,
这样drive重启后那些保存在内存中但是没有写入到log中的数据将会重新发送,这两点保证的数据的无丢失。
注:wal的日志文件是保存在HDFS或者本地磁盘上的
注:但是实时数据过来,会进入缓冲区,然后会写入一个持久化的日志中,但是缓存区的数据不会清空,会直接拿来进行spark计算!!持久化的日志仅仅是系统挂了,用于恢复数据!!
但是还是容易内存溢出,所以wal在实际中,如果不开启,则内存必须够!
注: 值得注意的是WAL开启了以后会减少Spark Streaming处理数据的吞吐,因为所有接收的数据会被写到到容错的文件系统上,这样文件系统的吞吐和网络带宽将成为瓶颈。
可以通过添加更多接收器进行解决
wal实现:
给streamingContext设置checkpoint的目录,该目录必须是HADOOP支持的文件系统,用来保存WAL和做Streaming的checkpoint
checkpoint("hdfs://xxxx/xx")
5、Spark Streaming中的Receiver方式和直连方式
Spark Streaming从Kafka中接受数据的时候有两种方式,一种是使用Receiver的老方法,另一种是使用直连的方法
1.Receiver方式
从Kafka中拉取数据,每次接受固定时间间隔的数据存储到内存中,但是这样可能会因为数据量太大,而造成内存溢出,溢出的数据就会发生丢失。
所以要么内存充足,要么使用WAL,将数据通过预写日志中保存在HDFS或者磁盘中,以保证数据不丢失。但一般为了保证突发情况,都是开启WAL。
(如果Receiver的是Kafka,那么WAL写入还是数据!!不是偏移量)
实际生产中,已经不使用这种了。
2.director(直连方式)
这种方法不是使用接收器来接收数据,而是定期向Kafka查询每个主题+分区中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围,
然后直接从kafka的分区文件中进行读消息,而不是将这个偏移量范围内的数据全部加入内存。
当启动处理数据的作业时,Kafka的简单消费者API用于读取Kafka定义的偏移范围(类似于从文件系统读取的文件)
该方法是直接将RDD中的分区连接到Kafka的分区上,相当于连接了一条水管,这样读取数据的效率更高(但是需要自己维护偏移量---定期维护每个kafka的每个主题每个分区的偏移量)
1、直连方式从Kafka集群中读取数据,并且在Spark Streaming系统里面维护偏移量相关的信息,实现零数据丢失,保证不重复消费,比createStream更高效(数据读且只读一次)
2、创建的DStream的rdd的partition做到了和Kafka中topic的partition一一对应。(读数据、处理数据在同一个节点!这样可以边读边处理!速度得到提高)
3、zookeeper更新zookeeper中的consumer offsets还需要自己去实现。但是Receiver不用手动实现。
大体思路:
在采用直连的方式消费kafka中的数据的时候,大体思路是首先获取保存在zookeeper中的偏移量信息,
根据偏移量信息去创建stream,消费数据后再把当前的偏移量写入zk中。
//zookeeper更新zookeeper中的consumer offsets还需要自己去实现,而不是zookeeper进行监控
//director不用开启wal机制,因为数据不会发生丢失!!
优势:
1、降低资源。Direct不需要Receivers,其申请的Executors全部参与到计算任务中
2、降低内存。在计算时读取数据,然后直接计算,而不是数据全部在内存中。所以对内存的要求很低
不足:
1、提高成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets(zk、mysql等),
而不像Receiver-based那样,通过ZooKeeper自动维护Offsets,此提高了用户的开发成本。
注:consumer offsets是必须要保存的,但是不一定用zookeeper,可以用checkpoint,mysql等。(Checkpoint保存偏移量,Receiver Checkpoint是开启wal,其中保存的是数据)
6、Spark Streaming 容错、持久化和性能调优
①容错 DStream基于RDD组成,RDD的容错性依旧有效: 1、RDD是一个不可变的、确定性的可重复计算的分布式数据集。RDD的某些partition丢失了,可以通过血统(lineage)信息重新计算恢复; 2、如果RDD任何分区因worker节点故障而丢失,那么这个分区可以从原来依赖的容错数据集中恢复; //节点故障,从数据中进行重新计算 3、即使集群出现故障,只要输入数据集存在,所有的中间结果都是可以被计算的 //集群故障,从数据中进行重新计算 Spark Streaming是可以从HDFS和S3这样的文件系统读取数据的,这种情况下所有的数据都可以被重新计算,不用担心数据的丢失。
但是在大多数情况下,Spark Streaming是基于网络来接受数据的, 1、内存备份:在接受网络的数据时会在集群的多个Worker节点间进行数据的复制(默认的复制数是2)
2、wal:预写日志文件 ②持久化 1、缓存(persisit) ①Dstream是由一系列的RDD构成的,它同一般的RDD一样,也可以将流式数据持久化,采用同样的persisit方法,调用该方法后DStream将持久化所有的RDD数据。 ②reduceByWindow、reduceByKeyAndWindow等基于窗口操作的方法,它们默认都是有persisit操作的。因为这些都是相当于全局运算。所以系统会自动缓存 2、Checkpoint Spark Streaming应用程序如果不手动停止,则将一直运行下去,在实际中应用程序一般是24小时*7天不间断运行的,因此Streaming必须对诸如系统错误,
JVM出错等与程序逻辑无关的错误(failures)具体很强的弹性,具备一定的非应用程序出错的容错性。Spark Streaming的Checkpoint机制便是为此设计的
它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复。 ①什么时候需要启用checkpoint? 1、对于窗口和有状态的操作如updateStateByKey、reduceByKeyAndWindow必须checkpoint,必须通过StreamingContext的checkpoint来指定目录
来允许定时的RDD checkpoint 。因为这些操作都是相当于全局运算。不指定目录,不容易恢复。 2、希望能从意外中恢复driver ,如非程序的系统错误! 3、Receiver可以用Checkpoint开启wal机制,Director也可以用Checkpoint保存偏移量(Receiver在checkpoint 存入的是数据,Director存入的是偏移量) 注:类似的全局操作都必须使用checkpoint ,不管是director还是Receiver方式连接kafka ②如何使用checkpoint? ssc.checkpoint(ckeckpointDirectory) //指定目录 - 若application为首次重启,将创建一个新的StreamContext实例
- 如果application是从失败中重启,将会从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。 val ssc = StreamingContext.getOrCreate(ckeckpointDirectory,functionToCreateContext _) //从现有的ckeckpointDirectory读取ssc, 需要注意的是,随着streaming application的持续运行,checkpoint数据占用的存储空间会不断变大。
因此,需要小心设置checkpoint的时间间隔。设置的越小 ,checkpoint次数会越多,占用空间会越大;
如果设置越大,会导致恢复时丢失的数据和进度越多。 ③调优 1. 优化运行时间 ①增加并行度 确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源; ②减少数据序列化,反序列化的负担 ③设置合理的batch duration(批处理时间间) 在Spark Streaming中,Job之间有可能存在依赖关系,后面的Job必须确保前面的作业执行结束后才能提交。若前面的Job执行的时间超出了批处理时间间隔,
那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。
因此设置一个合理的批处理间隔以确保作业能够在这个批处理间隔内结束时必须的; 2. 优化内存使用 ①控制batch size(批处理间隔内的数据量) Spark Streaming会把批处理间隔内接收到的所有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存中
能容纳这个批处理时间间隔内的所有数据,否则必须增加新的资源以提高集群的处理能力; ②及时清理不再使用的数据 前面讲到Spark Streaming会将接受的数据全部存储到内部可用内存区域中,因此对于处理过的不再需要的数据应及时清理,
以确保Spark Streaming有富余的可用内存空间。通过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据,
这个参数需要小心设置以免后续操作中所需要的数据被超时错误处理;