感谢DT大数据梦工厂支持提供技术支持,DT大数据梦工厂专注于Spark发行版定制。
本期概览:
数据接收全生命周期的思考
大数据处理框架中,最重要的就是性能,性能是排在前面的。其次再考虑其他的。因为数据量大,一不小心的多余的操作,几分钟,十几分钟就过去了。
根据一般的架构设计原则,接收数据和存储数据是不同的对象来完成的。
Spark Streaming数据接收全生命周期可以看成是一个MVC模式,ReceiverSupervisor相当于是控制器(c),Receiver(v)
首先启动的是ReceiverTracker。
开启通信并且启动receiver执行线程
Start a receiver along with its scheduled executors
Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
要注意的是Receiver是可序列化的,要进行通信
值得注意的是ReceiverSupervisor与ReceiverTracker的消息通信的主要代码如下
/** Divides received data records into data blocks for pushing in BlockManager. */
这里的调用onStart()方法要先于Receiver的onStart()方法,因为Receiver的onStart()方法要用到BlockGenerator等在这里的调用onStart()初始化的值
* Note: Do not create BlockGenerator instances directly inside receivers. Use
* `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
这里生动的说明了一个BlockGenerator只服务于一个DStream
Receiver接收数据应该是非阻塞式的,所以应该单独开启一条线程来执行
默认情况 下,每200毫秒产生一个Block,并且在生产环境中有个最佳实践,那就是性能调优的时候spark.streaming.blockInterval最好不要低于50毫秒,因为一般情况下产生的碎片小文件过多,过多的句柄占据内存或者磁盘空间,造成性能下降,当然,根据具体的不同的数据的流入的速度不同,最优化的设置多少时间的数据合并为一个Block是不同的。要根据具体情况具体分析。原则上是产生的文件大小在速度和句柄数量之间平衡。
每隔10毫秒就push数据到磁盘(Block)
发送消息启动所有的receivers
/**
* Start a receiver along with its scheduled executors 将调度的receiver启动
*/
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}