1.说明
DStream的API不够满足使用的时候,可以使用这两个函数,将dstream转换为rdd,然后进行操作
2.transform
transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
3.程序
package com.window.it
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
object TransformDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StreamingWindowOfKafka")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(5))
// 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
// 路径对应的文件夹不能存在
ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/4525712") val kafkaParams = Map(
"group.id" -> "streaming-kafka-78912151",
"zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
"auto.offset.reset" -> "smallest"
)
val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
ssc, // 给定SparkStreaming上下文
kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
topics, // 给定读取对应topic的名称以及读取数据的线程数量
StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
).map(_._2) val resultWordCount = dstream
.filter(line => line.nonEmpty)
.flatMap(line => line.split(" ").map((_, 1)))
.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,
Seconds(15), // 窗口大小
Seconds(10) // 滑动大小
)
resultWordCount.print() // 这个也是打印数据 /**
* transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
*/
dstream.transform(rdd => {
// 对rdd进行预处理
val processedRDD = rdd
.filter(line => line.nonEmpty)
.flatMap(line => line.split(" ").map((_, 1)))
.reduceByKey(_ + _)
// 数据抽样,获取两个节点
val seeder = processedRDD.takeSample(true, 2)
// 对rdd进行处理操作, 将抽样数据和rdd中的数据进行比较,如果rdd中的word的出现次数大于等于抽样数据中的任何一个word的次数,次数*3;否则次数*2
val brocast = rdd.sparkContext.broadcast(seeder)
val resultRDD = processedRDD.mapPartitions(iter => {
val seederValue = brocast.value
iter.map {
case (word, count) => {
val vc = seederValue
.filter(tuple => {
count >= tuple._2
}).size
if (vc == 0) {
(word, 2, count * 2)
} else {
(word, 3, count * 3)
}
}
}
})
resultRDD
}).print() // 启动开始处理
ssc.start()
ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
}
}
4.foreachRDD
作用和transform类型,将DStream的操作转换为RDD进行操作,区别:该api没有返回值
5.程序
package com.window.it import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext} object TransformDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StreamingWindowOfKafka")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(5))
// 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
// 路径对应的文件夹不能存在
ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/4525712") val kafkaParams = Map(
"group.id" -> "streaming-kafka-78912151",
"zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
"auto.offset.reset" -> "smallest"
)
val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
ssc, // 给定SparkStreaming上下文
kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
topics, // 给定读取对应topic的名称以及读取数据的线程数量
StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
).map(_._2) val resultWordCount = dstream
.filter(line => line.nonEmpty)
.flatMap(line => line.split(" ").map((_, 1)))
.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,
Seconds(15), // 窗口大小
Seconds(10) // 滑动大小
)
resultWordCount.print() // 这个也是打印数据 dstream.foreachRDD(rdd => {
// TODO: 这里就可以做数据输出的代码编写
// TODO: 这里不要为空
rdd.foreachPartition(iter => {
// TODO: 这里在实际环境中不要为空,为空可能会出现一些问题:内存泄露的问题
println(iter.take(1))
})
}) // 启动开始处理
ssc.start()
ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
}
}
6.注意点
一个批次,DStream内部就只对应一个RDD,transform和foreachRDD API使用的过程中,不要考虑多个RDD的问题