(14)监控管理流式查询

管理查询

流查询的管理操作主要是类是StreamingQueryManager类。该对象可以通过SparkSession获得,预留的主要操作如下:

(14)监控管理流式查询

 

最最重要的就是增加和移除Listener,然后供我们获取每个批次处理的数据具体信息。可以通过listener获取的信息如下:

 (14)监控管理流式查询

 

StreamingQuery对象在查询启动的时候被创建,可以用来监控管理查询,该对象也可以按照上面所说的方式通过StreamingQueryManager的get方法来获得,前提是先保存了查询的UUID或者ID。

该类主要有四个实现类(StreamingQueryWrapper,),常用的实现类是StreamingQueryWrapper。

(14)监控管理流式查询

 

可以使用StreamingQuery对象对流查询做的操作如下:

val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query

 在同一个sparksession中,可以启动任意数目的查询。他们会以共享集群资源的形式并行执行。可以通过sparkSession.streams()来获取StreamingQueryManager,其可以用来管理当前活跃的查询。

val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates

 

监控流查询

有两个API用于监控和调试查询 - 以交互方式和异步方式。

1,交互API

可以使用streamingQuery.lastProgress()和streamingQuery.status()直接获取active查询的当前状态和指标。lastProgress()在Scala和Java中返回一个StreamingQueryProgress对象,而在Python中返回与该字段相同的字典。它具有关于流的上一个触发操作进度的所有信息 - 处理哪些数据,处理速率,延迟等等。还有streamingQuery.recentProgress返回最后几个处理信息的数组。

此外,streamingQuery.status()在Scala和Java中返回一个StreamingQueryStatus对象,在Python中返回具有相同字段的字典。它提供有关查询立即执行的信息 - 触发器是活跃的,正在处理的数据等。

这里有几个例子。

val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/

println(query.status)
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/

 

2,异步API

还可以通过附加StreamingQueryListener(Scala / Java文档)异步监控与SparkSession关联的所有查询。使用sparkSession.streams.attachListener()添加自定义StreamingQueryListener对象,将在查询启动和停止时以及在查询执行中获得回调。

 

spark.streams.addListener(new StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    println("Query started ! ")
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    println("event.progress.batchId ===========> "+event.progress.batchId)
    println("event.progress.durationMs ===========> "+event.progress.durationMs)
    println("event.progress.eventTime ===========> "+event.progress.eventTime)
    println("event.progress.id ===========> "+event.progress.id)
    println("event.progress.name ===========> "+event.progress.name)
    println("event.progress.sink.json ===========> "+event.progress.sink.json)
    println("event.progress.sources.length ===========> "+event.progress.sources.length)
    println("event.progress.sources(0).description ===========> "+event.progress.sources(0).description)
    println("event.progress.sources(0).inputRowsPerSecond ===========> "+event.progress.sources(0).inputRowsPerSecond)
    println("event.progress.sources(0).numInputRows ===========> "+event.progress.sources(0).numInputRows)
    println("event.progress.sources(0).startOffset ===========> "+event.progress.sources(0).startOffset)
    println("event.progress.sources(0).processedRowsPerSecond ===========> "+event.progress.sources(0).processedRowsPerSecond)
    println("event.progress.sources(0).endOffset ===========> "+event.progress.sources(0).endOffset)

    println("event.progress.processedRowsPerSecond ===========> "+event.progress.processedRowsPerSecond)
    println("event.progress.timestamp ===========> "+event.progress.timestamp)
    println("event.progress.stateOperators.size ===========> "+event.progress.stateOperators.size)
    println("event.progress.inputRowsPerSecond ===========> "+event.progress.inputRowsPerSecond)

  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    println("Query stopped ! ")
  }

})

操作案例:

 

package bigdata.spark.StructuredStreaming.MMOperator

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener}
import org.apache.spark.streaming.scheduler.StreamingListener

object SQListener {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client")
      .set("yarn.resourcemanager.hostname", "mt-mdh.local")
      .set("spark.executor.instances","2")
      .set("spark.default.parallelism","4")
      .set("spark.sql.shuffle.partitions","4")
      .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
        ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
        ,"/opt/jars/kafka-clients-0.10.2.2.jar"
        ,"/opt/jars/kafka_2.11-0.10.2.2.jar"))

    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()

    import spark.implicits._
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
        println("Query started ! ")
      }

      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println("event.progress.batchId ===========> "+event.progress.batchId)
        println("event.progress.durationMs ===========> "+event.progress.durationMs)
        println("event.progress.eventTime ===========> "+event.progress.eventTime)
        println("event.progress.id ===========> "+event.progress.id)
        println("event.progress.name ===========> "+event.progress.name)
        println("event.progress.sink.json ===========> "+event.progress.sink.json)
        println("event.progress.sources.length ===========> "+event.progress.sources.length)
        println("event.progress.sources(0).description ===========> "+event.progress.sources(0).description)
        println("event.progress.sources(0).inputRowsPerSecond ===========> "+event.progress.sources(0).inputRowsPerSecond)
        println("event.progress.sources(0).numInputRows ===========> "+event.progress.sources(0).numInputRows)
        println("event.progress.sources(0).startOffset ===========> "+event.progress.sources(0).startOffset)
        println("event.progress.sources(0).processedRowsPerSecond ===========> "+event.progress.sources(0).processedRowsPerSecond)
        println("event.progress.sources(0).endOffset ===========> "+event.progress.sources(0).endOffset)

        println("event.progress.processedRowsPerSecond ===========> "+event.progress.processedRowsPerSecond)
        println("event.progress.timestamp ===========> "+event.progress.timestamp)
        println("event.progress.stateOperators.size ===========> "+event.progress.stateOperators.size)
        println("event.progress.inputRowsPerSecond ===========> "+event.progress.inputRowsPerSecond)

      }

      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
        println("Query stopped ! ")
      }

    })
    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
      .option("subscribe", "split_test")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines
      .flatMap(_.split(" "))
      .groupBy("value")
      .count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts
      .writeStream
      .outputMode(OutputMode.Update())
      .format("console")
      .start()
    println(query.id)
    println(spark.streams.get(query.id).id)
    query.awaitTermination()
  }

}

 

 建议是使用listener方式去监控流查询的状态。

上一篇:使用Ajax显示另一个Ajax请求调用的PHP脚本的进度


下一篇:如何实现现代PHP Web应用程序的AJAX进度指标?