本节主要内容
本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html
- DStream Transformation操作
1. Transformation操作
Transformation | Meaning |
---|---|
map(func) | 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream. |
flatMap(func) | 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项 |
filter(func) | 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream |
repartition(numPartitions) | 增加或减少DStream中的分区数,从而改变DStream的并行度 |
union(otherStream) | 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream. |
count() | 通过对DStreaim中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream |
reduce(func) | 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream. |
countByValue() | 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数 |
reduceByKey(func, [numTasks]) | 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream |
join(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream |
cogroup(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream |
transform(func) | 通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD |
updateStateByKey(func) | 根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream |
具体示例:
//读取本地文件~/streaming文件夹
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(x => (x, 1))
val wordCounts=wordMap.reduceByKey(_ + _)
val filteredWordCounts=wordCounts.filter(_._2>1)
val numOfCount=filteredWordCounts.count()
val countByValue=words.countByValue()
val union=words.union(word1)
val transform=words.transform(x=>x.map(x=>(x,1)))
//显式原文件
lines.print()
//打印flatMap结果
words.print()
//打印map结果
wordMap.print()
//打印reduceByKey结果
wordCounts.print()
//打印filter结果
filteredWordCounts.print()
//打印count结果
numOfCount.print()
//打印countByValue结果
countByValue.print()
//打印union结果
union.print()
//打印transform结果
transform.print()
下面的代码是运行时添加的文件内容
root@sparkmaster:~/streaming# echo "A B C D" >> test12.txt; echo "A B" >> test12.txt
下面是前面各个函数的结果
-------------------------------------------
lines.print()
-------------------------------------------
A B C D
A B
-------------------------------------------
flatMap结果
-------------------------------------------
A
B
C
D
A
B
-------------------------------------------
map结果
-------------------------------------------
(A,1)
(B,1)
(C,1)
(D,1)
(A,1)
(B,1)
-------------------------------------------
reduceByKey结果
-------------------------------------------
(B,2)
(D,1)
(A,2)
(C,1)
-------------------------------------------
filter结果
-------------------------------------------
(B,2)
(A,2)
-------------------------------------------
count结果
-------------------------------------------
2
-------------------------------------------
countByValue结果
-------------------------------------------
(B,2)
(D,1)
(A,2)
(C,1)
-------------------------------------------
union结果
-------------------------------------------
A
B
C
D
A
B
A
B
C
D
...
-------------------------------------------
transform结果
-------------------------------------------
(A,1)
(B,1)
(C,1)
(D,1)
(A,1)
(B,1)
示例2:
上节课中演示的WordCount代码并没有只是对输入的单词进行分开计数,没有记录前一次计数的状态,如果想要连续地进行计数,则可以使用updateStateByKey方法来进行。下面的代码主要给大家演示如何updateStateByKey的方法,
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}
//函数字面量,输入的当前值与前一次的状态结果进行累加
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
//输入类型为K,V,S,返回值类型为K,S
//V对应为带求和的值,S为前一次的状态
val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[4]")
//每一秒处理一次
val ssc = new StreamingContext(sparkConf, Seconds(1))
//当前目录为checkpoint结果目录,后面会讲checkpoint在Spark Streaming中的应用
ssc.checkpoint(".")
//RDD的初始化结果
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
//使用Socket作为输入源,本例ip为localhost,端口为9999
val lines = ssc.socketTextStream(args(0), args(1).toInt)
//flatMap操作
val words = lines.flatMap(_.split(" "))
//map操作
val wordDstream = words.map(x => (x, 1))
//updateStateByKey函数使用
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
下图是初始时的值:
使用下列命令启动netcat server
root@sparkmaster:~/streaming# nc -lk 9999
然后输入
root@sparkmaster:~/streaming# nc -lk 9999
hello
将得到下图的结果
然后再输入world,
root@sparkmaster:~/streaming# nc -lk 9999
hello
world
则将得到下列结果