用过Spark Streaming的应该都比较了解checkpoint机制。对于Spark Structured Streaming 假如存在聚合函数,join等操作的时候实际上也是要维护中间状态的,这种情况下就需要开启checkpoint。当然,即使没有非状态的算子,由于Structured Streaming是自己管理offset的,不会将offset提交到kafka或者zk,所以为了恢复的时候从上次位置重启,也要开启checkpoint。
使用Checkpoint和预写日志,如果发生故障或关机,可以恢复之前的查询的进度和状态,并从停止的地方继续执行。通过给查询任务配置checkpointLocation参数,开启checkpoint,查询任务将将所有进度信息(即,每次触发中处理的偏移范围)和运行聚合保存到checkpoint。此检checkpoint存储位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时将其设置为DataStreamWriter中的选项。
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
具体测试代码如下:
package bigdata.spark.StructuredStreaming.KafkaSourceOperator
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object StructuredKafkaWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client")
.set("yarn.resourcemanager.hostname", "mt-mdh.local")
.set("spark.executor.instances","2")
.set("spark.default.parallelism","4")
.set("spark.sql.shuffle.partitions","4")
.setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
,"/opt/jars/kafka-clients-0.10.2.2.jar"
,"/opt/jars/kafka_2.11-0.10.2.2.jar"))
val spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mt-mdh.local:9093")
.option("subscribe", "Stream_Static_Join")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// Generate running word count
val wordCounts = lines
.flatMap(_.split(" "))
.groupBy("value")
.count()
// Start running the query that prints the running counts to the console
val query = wordCounts
.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}