一、DStream 转换
DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。
1、无状态转化操作
无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。
-
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。
-
例如,reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
(1)Transform
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。
案例一:参数为一个,即transformFunc: RDD[T] => RDD[U]
object Transform {
def main(args: Array[String]): Unit = {
//创建 SparkConfval sparkConf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建 StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//创建 DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoopwei", 9999)
//转换为 RDD 操作
val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd =>
{
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
value
})
//打印
wordAndCountDStream.print
//启动
ssc.start()
ssc.awaitTermination()
}
}
案例二:参数为两个,即transformFunc: (RDD[T], Time) => RDD[U]
package cn.kgc.kb09.Spark
import java.text.SimpleDateFormat
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object SparkStreaming_Transform {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDemo")
val streamingContext = new StreamingContext(conf,Seconds(2))
val kafkaParams: Map[String, String] = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.247.201:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup6")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)
)
// 业务需求需要更改数据结构是可以使用transform完成转化工作
val numStream = kafkaStream.transform((rdd, timestamp) => {
val format = new SimpleDateFormat("yyyyMMdd HH:mm:ss")
val time = format.format(timestamp.milliseconds)
val value =
rdd.flatMap(x => x.value().split("\\s+"))
.map(x => ((x, time), 1))
.reduceByKey((x,y) => {x+y})
.sortBy(x => x._2,ascending = false)
value
})
numStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
(2)join
两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object JoinTest {
def main(args: Array[String]): Unit = {
// 1.创建 SparkConf
val sparkConf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("JoinTest")
// 2.创建 StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 3.从端口获取数据创建流
val lineDStream1: ReceiverInputDStream[String] =
ssc.socketTextStream("hadoopwei", 9999)
val lineDStream2: ReceiverInputDStream[String] =
ssc.socketTextStream("haoopwei", 8888)
// 4.将两个流转换为 KV 类型
val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))
// 5.流的 JOIN
val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)
// 6.打印
joinDStream.print()
// 7.启动任务
ssc.start()
ssc.awaitTermination()
}
}
2、有状态转化操作
(1)UpdateStateByKey
-
UpdateStateByKey
原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()
为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。 -
updateStateByKey()
的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。 -
updateStateByKey
操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
-
使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。
案例:
编写代码
package cn.kgc.kb09.Spark
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamKafkaSource {
def main(args: Array[String]): Unit = {
// 创建SparkConf
val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDemo")
// 创建StreamingContext
val streamingContext = new StreamingContext(conf,Seconds(5))
// 设置checkpoint目录
streamingContext.checkpoint("checkpoint")
// 配置Kafka相关配置
val kafkaParams: Map[String, String] = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.247.201:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
)
// 通过 KafkaUtils.createDirectStream接受kafka数据,这里采用是kafka低级api偏移量不受zk管理
val kafkaStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)
)
// 对获取的数据进行处理
val wordStream = kafkaStream.flatMap(v => v.value().toString.split("\\s+"))
val mapStream = wordStream.map((_,1))
// 无状态
// val sumStream = mapStream.reduceByKey(_+_)
// sumStream.print()
// 有状态 hello,2 再输入hello,则返回(2,1)
// 前提条件:需要设置checkpoint
val stateSumStream: DStream[(String, Int)] = mapStream.updateStateByKey {
case (seq, buffer) => {
println(seq, seq.sum, buffer.getOrElse(0))
val sum = buffer.getOrElse(0) + seq.sum
Option(sum)
}
}
// 打印结果
stateSumStream.print()
// 启动流计算
streamingContext.start()
streamingContext.awaitTermination()
}
}
开启生成者并生产消息
[root@hadoopwei ~]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic spakKafkaDemo>hello world>hello Scala>
二、DStream输出
1、输出操作
(1)输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动,会出现ERROR:ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
(2)输出操作如下:
-
print()
:在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 Python API 中,同样的操作叫 print()。 -
saveAsTextFiles(prefix, [suffix])
:以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。 -
saveAsObjectFiles(prefix, [suffix])
:以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。 -
saveAsHadoopFiles(prefix, [suffix])
:将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。 -
foreachRDD(func)
:这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将RDD 存入文件或者通过网络将其写入数据库。
(3)通用的输出操作foreachRDD()
,它用来对 DStream 中的 RDD 运行任意计算。这和transform()
有些类似,都可以让我们访问任意 RDD。在 foreachRDD()
中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。
(4)注意:
-
和数据库等的JDBC连接不能写在 driver 层面(序列化原因)
-
如果写 foreach 则每个 RDD 中的每一条数据都创建连接,得不偿失;
-
增加 foreachPartition,在分区创建(获取)JDBC连接,则每个分区只创建一次。但可能会造成OOM即内存溢出。
-
foreach…等这些是没有返回值的,或者说返回值为空,也就不需要.print(),所以输出结果没有时间戳界面
object SparkWindowDemo8{
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("kafkaSource")
val streamingContext = new StreamingContext(conf,Seconds(2))
val kafkaParams: Map[String, String] = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.198.201:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> " org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup8")
)
val kafkaStream: InputDStream[ConsumerRecord[String, Int]] = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams))
// foreachRDD,foreach,foreachPartition综合比较
println("driver")
kafkaStream.foreachRDD(
(rdd)=> { //针对每个rdd
println("bb") //进来一个rdd就会输出一次,而一个rdd即是一个批处理时间段内的分布式数据集合
// val array: Array[ConsumerRecord[String, Int]] = rdd.collect() //收集到Driver端,实际上是不好实现的,因为跨机器传输需要实现序列化
// if(array.length>=1){
// array.foreach(x=>{//x为每个executor上的数据)
// println("cc")
// x.value().toString.foreach(y=>
// println("dd:",y)
// )
// }
// )}
rdd.foreach(
x => { //每个批数据中的每一行数据
println("cc") //有n行数据就打印n次
val strings: Array[String] = x.value().toString.split("\\s+")
println(x.key(), strings.toList)
}
)
//推荐使用,但可能会内存溢出
// rdd.foreachPartition(
// x=>{//这里的x为rdd中的每个分区
// println("cc")
// x.foreach(
// y=>{ //每个分区的每条数据执行下面计算逻辑,由分区主导
// println("dd")
// println(y.value().toString.split("\\s+").toList)
// }
// )
// }
// )
}
)
streamingContext.start()
streamingContext.awaitTermination()
}
}
2、Driver端和Executor端执行逻辑
package cn.kgc.kb09.sparkstream
import java.text.SimpleDateFormat
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkWindowDemo8{
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("kafkaSource")
val streamingContext = new StreamingContext(conf,Seconds(2))
val kafkaParams: Map[String, String] = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.198.201:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> " org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup9")
)
val kafkaStream: InputDStream[ConsumerRecord[String, Int]] = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams))
println("driver") //主程序在driver端执行,只执行一次
val wordStream: DStream[String] = kafkaStream.flatMap( //同一个批处理时间内,输入n行数据,下方计算逻辑会执行n次
line => { //当取得流数据时,计算逻辑在各自executor上执行,driver端和多个executor端往往不在一个节点上
println("executor") //一个批处理时间内,有n行数据就执行n次
line.value().toString.split("\\s+")
}
)
wordStream.print()
// println("driver") //主程序只执行一次
// val wordStream: DStream[String] = kafkaStream.transform(
// (rdd) => { //每过一个采集周期都会传过来组成stream的RDD分布式数据集(注:这里的RDD是一个采集周期的批数据而形成的分布式数据集合)
// println("bb") //来一个RDD执行一次
// val value: RDD[String] = rdd.flatMap( //没有数据就不会进入flatMap
// x => { //这里的x是对应于这个分布式数据集的每一行数据
// println("cc") //计算逻辑在executor中执行,当有数据后才执行,有多少行执行多少次
// x.value().toString.split("\\s+")
// }
// )
// value
// }
// )
// wordStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
参考文献:
https://blog.csdn.net/u012387141/article/details/105808291
https://www.cnblogs.com/hyunbar/p/12069043.html