1、计算原理
- Sparkstreaming处理数据可以分为实时流或者流
- Sparkstreaming从flume或者kafka中拉取数据,而Sparkstreaming中会创建多个窗口,以RDD的形式存放这些数据,然后开始处理这些数据
- Sparkstreaming含有一个特有的算子updateStateByKey,就是在state中累计之前窗口中的数据。
- 如上图所示,窗口1先进行数据的统计,然后将数据放入到state中,然后,进行窗口2的数据统计,然后将state中的数据进行累加统计,依次类推,一直将所有的窗口内的数据统计完成,最终state中的数据即为统计结果。
2、代码实现
package com.njbdqn
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object MySs {
//自定义函数:将各个窗口中的数据根据分组的key值累加
//String:key值
//Seq[Int]:窗口中的RDD数据
//Option[Int]:state中的累加数据
val addFunc = (it:Iterator[(String,Seq[Int],Option[Int])])=>{
it.map(x=>{
(x._1,x._2.sum+x._3.getOrElse(0))
})
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("myss").setMaster("local[*]")
val sc = new SparkContext(conf)
//设置读取数据时间为5秒,5秒创建一个窗口,统计时也是一个窗口一个窗口开始统计
val ssc = new StreamingContext(sc,Seconds(5))
//拉取socket信息,
//在Linux下面安装netcat工具,进行网络数据传输
//当我们在linux中传输数据后,这边SparkStreaming接收到数据后开始计数
//192.168.153.200:虚拟机IP
//1234:端口号
val ds = ssc.socketTextStream("192.168.153.200", 1234)
//统计所有窗口的数据
//新建一个检查点路径,来统计各个state的数据统计值
sc.setCheckpointDir("E:\\BigDataStudy\\SparkStreaming\\cks")
val res = ds.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(addFunc,
new HashPartitioner(sc.defaultMinPartitions), true)
res.print()
ssc.start()
ssc.awaitTermination()
}
}
- 运行该代码后,我们发现Sparkstreaming开始i进行数据处理,每5秒钟为一个窗口开始计算,由于还没有数据,所有没有计算结果
3、安装NetCat,进行数据传输
yum install nmap-ncat.x86_64
- 启动netcat,设置端口号为1234,要与代码中的端口号一致
nc -l 1234
- 开始传输数据,此时我们发现,只要我们传输数据,Sparkstreaming就会帮我们累加每个窗口的数据的个数,最后得到总数据。
-
注意:这边我们设置数据读取统计时间为5秒,也就是说,我们在每个5秒内传输的数据会被放到一个窗口中进行统计。然后将这些窗口进行累加计算,获得每个单词的数量。