在使用SparkStreaming时,使用StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)创建StreamingContext。代码示例如下:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("UserBrowse")
val ssc = new StreamingContext(conf, batchInterval)
//通过LogHubCursorPosition.BEGIN_CURSOR指定消费Cursor的模式。
val loghubStream = LoghubUtils.createStream(...)
loghubStream.checkpoint(batchInterval * 5).foreachRDD { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
此时通过控制台提交Spark任务命令如下:
--class com.test.StreamingText
--jars /spark-it/loghub-spark-0.6.13_2.4.3-1.0.4.jar,/spark-it/loghub-client-lib-0.6.13.jar
--driver-memory 1G
--driver-cores 1
--executor-cores 3
--executor-memory 3G
--num-executors 1
--name spark_on_loghub
/spark-it/sparkstreaming-0.0.1-SNAPSHOT.jar
/tmp/checkpoint_location_test
其中/tmp/checkpoint_location_test 为StreamingContext的checkpoint路径。
运行一段时间后,用户期望修改executor-cores为4,executor-memory 为12G,num-executors为3。那如何修改呢?
由于SparkStreming的运行机制是长久运行,以及checkpoint的设置是为了任务异常能从checkpoint恢复数据。
首次提交任务后,StremingContext会把Spark的配置信息写入到Checkpoint中,包括:executor-cores、num-executors、executor-memory等配置信息。
当任务异常或者重启后,StremingContext会从Checkpoint中读取Spark的配置信息。所以这时如果在控制台修改executor-cores、executor-memory等配置信息,StremingContext不会读取的。
如果需要修改executor-cores、executor-memory等配置信息需要清除Checkpoint路径,或者重新指定一个新的Checkpoint路径。