以下是对StreamingListene的研究,由于比较简单,故只贴代码,不做解释
/** * Created by gabry.wu on 2016/5/27. * 实现StreamingListener,以监控spark作业状态
* 传入StreamingContext可以在某种出错时退出当前的SparkStreaming */ class StreamingMonitor(ssc:StreamingContext) extends StreamingListener{ private val log = LoggerFactory.getLogger("SparkStreamingMonitor") // Receiver启动 override def onReceiverStarted(receiverStarted : StreamingListenerReceiverStarted): Unit = { log.warn("onReceiverStarted") log.warn(s"active=${receiverStarted.receiverInfo.active},executorId=${receiverStarted.receiverInfo.executorId}," + s"lastError=${receiverStarted.receiverInfo.lastError},lastErrorMessage=${receiverStarted.receiverInfo.lastErrorMessage}," + s"location=${receiverStarted.receiverInfo.location},name=${receiverStarted.receiverInfo.name}," + s"streamId=${receiverStarted.receiverInfo.streamId}") } // Receiver报错 override def onReceiverError(receiverError : StreamingListenerReceiverError): Unit = { log.warn("onReceiverError") //可在该函数处理Receiver失败 log.warn(s"active=${receiverError.receiverInfo.active},executorId=${receiverError.receiverInfo.executorId}," + s"lastError=${receiverError.receiverInfo.lastError},lastErrorMessage=${receiverError.receiverInfo.lastErrorMessage}," + s"location=${receiverError.receiverInfo.location},name=${receiverError.receiverInfo.name}," + s"streamId=${receiverError.receiverInfo.streamId}") } // Receiver停止 override def onReceiverStopped(receiverStopped : StreamingListenerReceiverStopped): Unit = { log.warn("onReceiverStopped") log.warn(s"active=${receiverStopped.receiverInfo.active},executorId=${receiverStopped.receiverInfo.executorId}," + s"lastError=${receiverStopped.receiverInfo.lastError},lastErrorMessage=${receiverStopped.receiverInfo.lastErrorMessage}," + s"location=${receiverStopped.receiverInfo.location},name=${receiverStopped.receiverInfo.name}," + s"streamId=${receiverStopped.receiverInfo.streamId}") } // Batch提交作业 override def onBatchSubmitted(batchSubmitted : StreamingListenerBatchSubmitted): Unit = { log.warn("onBatchSubmitted") // 提交作业之前已经知道有多少数据 // batchSubmitted.batchInfo.numRecords是此次batch的数据量 log.warn(s"batchTime=${batchSubmitted.batchInfo.batchTime},numRecords=${batchSubmitted.batchInfo.numRecords}," + s"processingDelay=${batchSubmitted.batchInfo.processingDelay},processingEndTime=${batchSubmitted.batchInfo.processingEndTime}," + s"processingStartTime=${batchSubmitted.batchInfo.processingStartTime},schedulingDelay=${batchSubmitted.batchInfo.schedulingDelay}," + s"submissionTime=${batchSubmitted.batchInfo.submissionTime},totalDelay=${batchSubmitted.batchInfo.totalDelay}") } // Batch启动 override def onBatchStarted(batchStarted : StreamingListenerBatchStarted): Unit = { log.warn("onBatchStarted") //batchStarted.batchInfo.schedulingDelay:从提交到正式启动batch的间隔时间 log.warn(s"batchTime=${batchStarted.batchInfo.batchTime},numRecords=${batchStarted.batchInfo.numRecords}," + s"processingDelay=${batchStarted.batchInfo.processingDelay},processingEndTime=${batchStarted.batchInfo.processingEndTime}," + s"processingStartTime=${batchStarted.batchInfo.processingStartTime},schedulingDelay=${batchStarted.batchInfo.schedulingDelay}," + s"submissionTime=${batchStarted.batchInfo.submissionTime},totalDelay=${batchStarted.batchInfo.totalDelay}") } // Batch完成 override def onBatchCompleted(batchCompleted : StreamingListenerBatchCompleted): Unit = { log.warn("onBatchCompleted") //batchCompleted.batchInfo.processingDelay:批量处理时间 //batchCompleted.batchInfo.totalDelay:此次批处理从提交,到最后结束总耗时 log.warn(s"batchTime=${batchCompleted.batchInfo.batchTime},numRecords=${batchCompleted.batchInfo.numRecords}," + s"processingDelay=${batchCompleted.batchInfo.processingDelay},processingEndTime=${batchCompleted.batchInfo.processingEndTime}," + s"processingStartTime=${batchCompleted.batchInfo.processingStartTime},schedulingDelay=${batchCompleted.batchInfo.schedulingDelay}," + s"submissionTime=${batchCompleted.batchInfo.submissionTime},totalDelay=${batchCompleted.batchInfo.totalDelay}") } // 输出操作开始 override def onOutputOperationStarted(outputOperationStarted : StreamingListenerOutputOperationStarted): Unit = { log.warn("onOutputOperationStarted") //outputOperationStarted.outputOperationInfo.description:其实就是Stack的部分信息,可用于输出Action的定位 //outputOperationStarted.outputOperationInfo.name:Action的函数名称 log.warn(s"batchTime=${outputOperationStarted.outputOperationInfo.batchTime},description=${outputOperationStarted.outputOperationInfo.description}," + s"duration=${outputOperationStarted.outputOperationInfo.duration},endTime=${outputOperationStarted.outputOperationInfo.endTime}," + s"failureReason=${outputOperationStarted.outputOperationInfo.failureReason},id=${outputOperationStarted.outputOperationInfo.id}," + s"name=${outputOperationStarted.outputOperationInfo.name},startTime=${outputOperationStarted.outputOperationInfo.startTime}") } // 输出操作完成 override def onOutputOperationCompleted(outputOperationCompleted : StreamingListenerOutputOperationCompleted): Unit = { log.warn("onOutputOperationCompleted") //outputOperationCompleted.outputOperationInfo.duration:Action的耗时 //outputOperationCompleted.outputOperationInfo.failureReason:Action失败的原因。可以在该函数中处理Batch失败 log.warn(s"batchTime=${outputOperationCompleted.outputOperationInfo.batchTime},description=${outputOperationCompleted.outputOperationInfo.description}," + s"duration=${outputOperationCompleted.outputOperationInfo.duration},endTime=${outputOperationCompleted.outputOperationInfo.endTime}," + s"failureReason=${outputOperationCompleted.outputOperationInfo.failureReason},id=${outputOperationCompleted.outputOperationInfo.id}," + s"name=${outputOperationCompleted.outputOperationInfo.name},startTime=${outputOperationCompleted.outputOperationInfo.startTime}") } }
下面是添加StreamingListene的代码
val ssc = new StreamingContext(sparkConf, new Duration(batchDuration)) ssc.addStreamingListener(new StreamingMonitor(ssc))
各个函数的调用顺序
onReceiverStarted->[接收到数据]->onBatchSubmitted->onBatchStarted->onOutputOperationStarted->onOutputOperationCompleted->onBatchCompleted->[接收到数据]->onBatchSubmitted->onBatchStarted->onOutputOperationStarted->onOutputOperationCompleted->onBatchCompleted->.......->onReceiverStopped
其中[接收到数据]是可选项,并不是每次都会接收到数据。