《Spark大数据分析实战》——3.2节Spark Streaming

本节书摘来自华章社区《Spark大数据分析实战》一书中的第3章,第3.2节Spark Streaming,作者高彦杰 倪亚宇,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.2 Spark Streaming
Spark Streaming是一个批处理的流式计算框架。它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错性。下面将对Spark Streaming进行详细的介绍。
3.2.1 Spark Streaming简介
Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流式大数据处理能力。Spark Streaming将数据流以时间片为单位进行分割形成RDD,使用RDD操作处理每一块数据,每块数据(也就是RDD)都会生成一个Spark Job进行处理,最终以批处理的方式处理每个时间片的数据。请参照图3-6。


《Spark大数据分析实战》——3.2节Spark Streaming

Spark Streaming编程接口和Spark很相似。在Spark中,通过在RDD上用Transformation(例如:map, f?ilter等)和Action(例如:count, collect等)算子进行运算。在Spark Streaming中通过在DStream(表示数据流的RDD序列)上进行算子运算。图3-7为Spark Streaming转
化过程。


《Spark大数据分析实战》——3.2节Spark Streaming

图3-7中Spark Streaming将程序中对DStream的操作转换为DStream有向无环图(DAG)。对每个时间片,DStream DAG会产生一个RDD DAG。在RDD中通过Action算子触发一个Job,然后Spark Streaming会将Job提交给JobManager。JobManager会将Job插入维护的Job队列,然后JobManager会将队列中的Job逐个提交给Spark DAGScheduler,然后Spark会调度Job并将Task分发到各节点的Executor上执行。
(1)优势及特点
1)多范式数据分析管道:能和Spark生态系统其他组件融合,实现交互查询和机器学习等多范式组合处理。
2)扩展性:可以运行在100个节点以上的集群,延迟可以控制在秒级。
3)容错性:使用Spark的Lineage及内存维护两份数据进行备份达到容错。RDD通过Lineage记录下之前的操作,如果某节点在运行时出现故障,则可以通过冗余备份数据在其他节点重新计算得到。


《Spark大数据分析实战》——3.2节Spark Streaming

5)实时性:Spark Streaming也是一个实时计算框架,Spark Streaming能够满足除对实时性要求非常高(例如:高频实时交易)之外的所有流式准实时计算场景。目前Spark Streaming最小的Batch Size的选取在0.5~2s(对比:Storm目前最小的延迟是100ms左右)。
(2)适用场景
Spark Streaming适合需要历史数据和实时数据结合进行分析的应用场景,对于实时性要求不是特别高的场景也能够胜任。
3.2.2 Spark Streaming架构
通过图3-10,读者可以对Spark Streaming的整体架构有宏观把握。


《Spark大数据分析实战》——3.2节Spark Streaming

组件介绍:
Network InputTracker:通过接收器接收流数据,并将流数据映射为输入DStream。
Job Scheduler:周期性地查询DStream图,通过输入的流数据生成Spark Job,将Spark Job提交给Job Manager进行执行。
JobManager:维护一个Job队列,将队列中的Job提交到Spark进行执行。
通过图3-10可以看到D-Stream Lineage Graph进行整体的流数据的DAG图调度,Taskscheduler负责具体的任务分发,Block tracker进行块管理。在从节点,如果是通过网络输入的流数据会将数据存储两份进行容错。Input receiver源源不断地接收输入流,Task execution负责执行主节点分发的任务,Block manager负责块管理。Spark Streaming整体架构和Spark很相近,很多思想是可以迁移理解的。
3.2.3 Spark Streaming原理剖析
下面将由一个example示例,通过源码呈现Spark Streaming的底层机制。
1.?初始化与接收数据
Spark Streaming通过分布在各个节点上的接收器,缓存接收到的流数据,并将数据包装成Spark能够处理的RDD的格式,输入到Spark Streaming,之后由Spark Streaming将作业提交到Spark集群进行执行,如图3-11所示。


《Spark大数据分析实战》——3.2节Spark Streaming

初始化的过程主要可以概括为两点。
1)调度器的初始化。
调度器调度Spark Streaming的运行,用户可以通过配置相关参数进行调优。
2)将输入流的接收器转化为RDD在集群进行分布式分配,然后启动接收器集合中的每个接收器。
针对不同的数据源,Spark Streaming提供了不同的数据接收器,分布在各个节点上的每个接收器可以认为是一个特定的进程,接收一部分流数据作为输入。
用户也可以针对自身生产环境状况,自定义开发相应的数据接收器。
如图3-12所示,接收器分布在各个节点上。通过下面代码,创建并行的、在不同Worker节点分布的receiver集合。

val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, 
Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
// 在这里创造RDD相当于进入SparkContext.makeRDD
// 此处将receivers的集合作为一个RDD进行分区RDD[Receiver]
// 即使是只有一个输入流,按照这个分布式也是流的输入端在worker而不再Master 
…
// 将receivers的集合打散,然后启动它们
…
ssc.sparkContext.runJob(tempRDD, startReceiver)
…
    }


《Spark大数据分析实战》——3.2节Spark Streaming

2.?数据接收与转化
在“初始化与接收数据”部分中已经介绍过,receiver集合转换为RDD,在集群上分布式地接收数据流。那么每个receiver是怎样接收并处理数据流的呢?读者可以通过图3-13,对输入流的处理有一个全面的了解。图3-13为Spark Streaming数据接收与转化的示意图。
图3-13的主要流程如下。
1)数据缓冲:在receiver的receive函数中接收流数据,将接收到的数据源源不断地放入到BlockGenerator.currentBuffer。
2)缓冲数据转化为数据块:在BlockGenerator中有一个定时器(RecurringTimer),将当前缓冲区中的数据以用户定义的时间间隔封装为一个数据块Block,放入到BlockGenerator的blocksForPush队列中(这个队列)。
3)数据块转化为Spark数据块:在BlockGenerator中有一个BlockPushingThread线程,不断地将blocksForPush队列中的块传递给BlockManager,让BlockManager将数据存储为块。BlockManager负责Spark中的块管理。
4)元数据存储:在pushArrayBuffer方法中还会将已经由BlockManager存储的元数据信息(例如:Block的id号)传递给ReceiverTracker,ReceiverTracker会将存储的blockId放到对应StreamId的队列中。


《Spark大数据分析实战》——3.2节Spark Streaming

图中部分组件的作用如下:

KeepPushingBlocks:调用此方法持续写入和保持数据块。
pushArrayBuffer:调用pushArrayBuffer方法将数据块存储到BlockManager中。
reportPushedBlock:存储完成后汇报数据块信息到主节点。
receivedBlockInfo(Meta Data):已经接收到的数据块元数据记录。
streamId:数据流Id。
BlockInfo:数据块元数据信息。
BlockManager.put:数据块存储器写入备份数据块到其他节点。
Receiver:数据块接收器,接收数据块。
BlockGenerator:数据块生成器,将数据缓存生成Spark能处理的数据块。
BlockGenerator.currentBuffer:缓存网络接收的数据记录,等待之后转换为Spark的数据块。
BlockGenerator.blocksForPushing:将一块连续数据记录暂存为数据块,待后续转换为Spark能够处理的BlockManager中的数据块(A Block As a BlockManager’s Block)。
BlockGenerator.blockPushingThread:守护线程负责将数据块转换为BlockManager中数据块。
ReceiveTracker:输入数据块的元数据管理器,负责管理和记录数据块。
BlockManager:Spark数据块管理器,负责数据块在内存或磁盘的管理。
RecurringTimer:时间触发器,每隔一定时间进行缓存数据的转换。
上面的过程中涉及最多的类就是BlockGenerator,在数据转化的过程中其扮演者不可或缺的角色。
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf
  ) extends Logging 

感兴趣的读者可以参照图中所示的类和方法进行更加具体的机制的了解。篇幅所限,对这个数据生成过程不再做具体的代码剖析。
3.?生成RDD与提交Spark Job
Spark Streaming根据时间段,将数据切分为RDD,然后触发RDD的Action提交Job,Job被提交到Job Manager中的Job Queue中由Job Scheduler调度,之后Job Scheduler将Job提交到Spark的Job调度器,然后将Job转换为大量的任务分发给Spark集群执行,如图3-14所示。


《Spark大数据分析实战》——3.2节Spark Streaming

Job generator中通过下面的方法生成Job进行调度和执行。
从下面的代码可以看出job是从outputStream中生成的,然后再触发反向回溯执行整个DStream DAG,类似RDD的机制。

private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
// 获取输入数据块的元数据信息
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
         . . .
        }.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
eventActor !DoCheckpoint(time)
  }
// 下面进入JobScheduler的submitJobSet方法一探究竟,JobScheduler是整个Spark  
   Streaming调度的核心组件
def submitJobSet(jobSet: JobSet) {
    . . .
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
 . . .
  }

// 进入Graph生成job的方法,Graph本质是DStreamGraph类生成的对象
final private[streaming] class DStreamGraph extends Serializable with Logging {
def generateJobs(time: Time): Seq[Job] = {
  . . .
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
   . . .
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
   . . .
  }

// outputStreams中的对象是DStream,下面进入DStream的generateJob一探究竟

private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
// 此处相当于针对每个时间段生成的一个RDD,会调用SparkContext的方法runJob提交Spark的一个Job
context.sparkContext.runJob(rdd, emptyFunc)
        }
Some(new Job(time, jobFunc))
      }
case None => None
    }
  }

// 在DStream算是父类,一些具体的DStream例如SocketInputStream等的类的父类可以通过
  SocketInputDStream看是如何通过上面的getOrCompute生成RDD的
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {

generatedRDDs.get(time) match {
      . . .
case None => {
if (isTimeValid(time)) {

// Dstream是个父类,这里代表的是子类的compute方法,DStream通过compute调用用户自定义函数。当任务执行时,同一个stage中的DStream函数会串联依次执行
compute(time) match {
            . . .
generatedRDDs.put(time, newRDD)
           . . .
  }
在SocketInputDStream的compute方法中生成了对应时间片的RDD:
override def compute(validTime: Time): Option[RDD[T]] = {
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
    } else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
    }
  }

Spark Streaming在保证实时处理的要求下还能够保证高吞吐与容错性。用户的数据分析中很多情况下也存在需要分析图数据,运行图算法,通过GraphX可以简便地开发分布式图分析算法。

上一篇:物联网平台结合区块链的落地场景分析


下一篇:“安装程序无法定位现有系统分区,也无法创建新的系统分区”提示