使用trigger
package com.qf.sparkstreaming.day04
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
/**
* trigger函数:
* sparkStreaming是一个准实时的计算框架,微批处理
* structuredStreaming是一个实时的计算框架,但是底层使用的sparksql的api,
* 并且是sparkStreaming的进化版,比微批处理更快,也有微小的时间段,最快可以达到 `100ms` 左右的端到端延迟。
* 而使用trigger函数可以做到1ms的端到端延迟。
*/
object _09Trigger {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
session.sparkContext.setLogLevel("ERROR")
//作为消费者,从kafka读取数据,获取到的数据有schema,
// 分别是 key|value|topic|partition|offset|timestamp|timestampType|
val frame: DataFrame = session.readStream.format("kafka")
.option("kafka.bootstrap.servers","qianfeng01:9092,qianfeng02:9092,qianfeng03:9092")
// .option("startingOffsets","earliest")
.option("subscribe","pet").load()
//处理一下数据
val frame1: DataFrame = frame.selectExpr("cast(value as String)")
//保存到kafka中
frame1.writeStream
.format("console")
.trigger(Trigger.ProcessingTime(0))
.start()
.awaitTermination()
}
}