管理流查询
流查询的管理操作主要是类是StreamingQueryManager类。该对象可以通过SparkSession获得,预留的主要操作如下:
最最重要的就是增加和移除Listener,然后供我们获取每个批次处理的数据具体信息。可以通过listener获取的信息如下:
StreamingQuery对象在查询启动的时候被创建,可以用来监控管理查询,该对象也可以按照上面所说的方式通过StreamingQueryManager的get方法来获得,前提是先保存了查询的UUID或者ID。
该类主要有四个实现类(StreamingQueryWrapper,),常用的实现类是StreamingQueryWrapper。
可以使用StreamingQuery对象对流查询做的操作如下:
val query = df.writeStream.format("console").start() // get the query object
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name // get the name of the auto-generated or user-specified name
query.explain() // print detailed explanations of the query
query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
query.exception // the exception if the query has been terminated with error
query.recentProgress // an array of the most recent progress updates for this query
query.lastProgress // the most recent progress update of this streaming query
在同一个sparksession中,可以启动任意数目的查询。他们会以共享集群资源的形式并行执行。可以通过sparkSession.streams()来获取StreamingQueryManager,其可以用来管理当前活跃的查询。
val spark: SparkSession = ...
spark.streams.active // get the list of currently active streaming queries
spark.streams.get(id) // get a query object by its unique id
spark.streams.awaitAnyTermination() // block until any one of them terminates
监控流查询
有两个API用于监控和调试查询 - 以交互方式和异步方式。
1,交互API
可以使用streamingQuery.lastProgress()和streamingQuery.status()直接获取active查询的当前状态和指标。lastProgress()在Scala和Java中返回一个StreamingQueryProgress对象,而在Python中返回与该字段相同的字典。它具有关于流的上一个触发操作进度的所有信息 - 处理哪些数据,处理速率,延迟等等。还有streamingQuery.recentProgress返回最后几个处理信息的数组。
此外,streamingQuery.status()在Scala和Java中返回一个StreamingQueryStatus对象,在Python中返回具有相同字段的字典。它提供有关查询立即执行的信息 - 触发器是活跃的,正在处理的数据等。
这里有几个例子。
val query: StreamingQuery = ...
println(query.lastProgress)
/* Will print something like the following.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
println(query.status)
/* Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
2,异步API
还可以通过附加StreamingQueryListener(Scala / Java文档)异步监控与SparkSession关联的所有查询。使用sparkSession.streams.attachListener()添加自定义StreamingQueryListener对象,将在查询启动和停止时以及在查询执行中获得回调。
spark.streams.addListener(new StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
println("Query started ! ")
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
println("event.progress.batchId ===========> "+event.progress.batchId)
println("event.progress.durationMs ===========> "+event.progress.durationMs)
println("event.progress.eventTime ===========> "+event.progress.eventTime)
println("event.progress.id ===========> "+event.progress.id)
println("event.progress.name ===========> "+event.progress.name)
println("event.progress.sink.json ===========> "+event.progress.sink.json)
println("event.progress.sources.length ===========> "+event.progress.sources.length)
println("event.progress.sources(0).description ===========> "+event.progress.sources(0).description)
println("event.progress.sources(0).inputRowsPerSecond ===========> "+event.progress.sources(0).inputRowsPerSecond)
println("event.progress.sources(0).numInputRows ===========> "+event.progress.sources(0).numInputRows)
println("event.progress.sources(0).startOffset ===========> "+event.progress.sources(0).startOffset)
println("event.progress.sources(0).processedRowsPerSecond ===========> "+event.progress.sources(0).processedRowsPerSecond)
println("event.progress.sources(0).endOffset ===========> "+event.progress.sources(0).endOffset)
println("event.progress.processedRowsPerSecond ===========> "+event.progress.processedRowsPerSecond)
println("event.progress.timestamp ===========> "+event.progress.timestamp)
println("event.progress.stateOperators.size ===========> "+event.progress.stateOperators.size)
println("event.progress.inputRowsPerSecond ===========> "+event.progress.inputRowsPerSecond)
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println("Query stopped ! ")
}
})
操作案例:
package bigdata.spark.StructuredStreaming.MMOperator
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener}
import org.apache.spark.streaming.scheduler.StreamingListener
object SQListener {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client")
.set("yarn.resourcemanager.hostname", "mt-mdh.local")
.set("spark.executor.instances","2")
.set("spark.default.parallelism","4")
.set("spark.sql.shuffle.partitions","4")
.setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
,"/opt/jars/kafka-clients-0.10.2.2.jar"
,"/opt/jars/kafka_2.11-0.10.2.2.jar"))
val spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
import spark.implicits._
spark.streams.addListener(new StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
println("Query started ! ")
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
println("event.progress.batchId ===========> "+event.progress.batchId)
println("event.progress.durationMs ===========> "+event.progress.durationMs)
println("event.progress.eventTime ===========> "+event.progress.eventTime)
println("event.progress.id ===========> "+event.progress.id)
println("event.progress.name ===========> "+event.progress.name)
println("event.progress.sink.json ===========> "+event.progress.sink.json)
println("event.progress.sources.length ===========> "+event.progress.sources.length)
println("event.progress.sources(0).description ===========> "+event.progress.sources(0).description)
println("event.progress.sources(0).inputRowsPerSecond ===========> "+event.progress.sources(0).inputRowsPerSecond)
println("event.progress.sources(0).numInputRows ===========> "+event.progress.sources(0).numInputRows)
println("event.progress.sources(0).startOffset ===========> "+event.progress.sources(0).startOffset)
println("event.progress.sources(0).processedRowsPerSecond ===========> "+event.progress.sources(0).processedRowsPerSecond)
println("event.progress.sources(0).endOffset ===========> "+event.progress.sources(0).endOffset)
println("event.progress.processedRowsPerSecond ===========> "+event.progress.processedRowsPerSecond)
println("event.progress.timestamp ===========> "+event.progress.timestamp)
println("event.progress.stateOperators.size ===========> "+event.progress.stateOperators.size)
println("event.progress.inputRowsPerSecond ===========> "+event.progress.inputRowsPerSecond)
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println("Query stopped ! ")
}
})
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mt-mdh.local:9093")
.option("subscribe", "split_test")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// Generate running word count
val wordCounts = lines
.flatMap(_.split(" "))
.groupBy("value")
.count()
// Start running the query that prints the running counts to the console
val query = wordCounts
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start()
println(query.id)
println(spark.streams.get(query.id).id)
query.awaitTermination()
}
}
建议是使用listener方式去监控流查询的状态。