object Socket_Streaming_State extends App {
val sc: SparkContext = SparkContext.getOrCreate(new SparkConf().setMaster(“local[*]”).setAppName(“hdfs”).set(“spark.executor memory”,“4g”))
val ssc = new StreamingContext(sc,Seconds(2))
//必须要设置检查点
ssc.checkpoint(“file:///D:/Practise/chekpoint”)
val inputDstream= ssc.socketTextStream(“192.168.56.100”,9999)
val wordDstream: DStream[String] = inputDstream.flatMap(.split(" "))
val wordAndOneDstream: DStream[(String, Int)] = wordDstream.map((,1))
//updateSateByKey是以DStream中的数据进行按key做reduce操作
// 参数是当前value(每个key新增的值的集合 如:hello [1,1,2])和上次value 如hello:5
//返回some
def updateFunc(currentValue:Seq[Int],preValue:Option[Int])={
val currsum= currentValue.sum
val pre = preValue.getOrElse(0)
Some(currsum+pre)
}
val value: DStream[(String, Int)] = wordAndOneDstream.updateStateByKey(updateFunc)
value.saveAsTextFiles("","")
ssc.start()
ssc.awaitTermination()
}