状态流累加

object Socket_Streaming_State extends App {
val sc: SparkContext = SparkContext.getOrCreate(new SparkConf().setMaster(“local[*]”).setAppName(“hdfs”).set(“spark.executor memory”,“4g”))
val ssc = new StreamingContext(sc,Seconds(2))
//必须要设置检查点
ssc.checkpoint(“file:///D:/Practise/chekpoint”)
val inputDstream= ssc.socketTextStream(“192.168.56.100”,9999)
val wordDstream: DStream[String] = inputDstream.flatMap(.split(" "))
val wordAndOneDstream: DStream[(String, Int)] = wordDstream.map((
,1))
//updateSateByKey是以DStream中的数据进行按key做reduce操作
// 参数是当前value(每个key新增的值的集合 如:hello [1,1,2])和上次value 如hello:5
//返回some
def updateFunc(currentValue:Seq[Int],preValue:Option[Int])={
val currsum= currentValue.sum
val pre = preValue.getOrElse(0)
Some(currsum+pre)
}

val value: DStream[(String, Int)] = wordAndOneDstream.updateStateByKey(updateFunc)
value.saveAsTextFiles("","")

ssc.start()
ssc.awaitTermination()

}

上一篇:SparkStreaming的DStream转换操作


下一篇:利用JQuery jsonp实现Ajax跨域请求 .Net 的*.handler 和 WebService,返回json数据