1、文件数据源案例
需求:读取hdfs上的Herry.txt文件,进行词频统计
package com.zch.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Author: zhaoHui
* Date: 2022/01/06
* Time: 14:29
* Description:
*/
object sparkStreaming01_FileWordCount {
def main(args: Array[String]): Unit = {
//1.初始化 Spark 配置信息
val sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("StreamWordCount")
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(5))
//3.监控文件夹创建 DStream
val dirStream = ssc.textFileStream("hdfs://zhaohui01:8020/Herry.txt")
//4.将每一行数据做切分,形成一个个单词
val wordStreams = dirStream.flatMap(_.split("\t"))
//5.将单词映射成元组(word,1)
val wordAndOneStreams = wordStreams.map((_, 1))
//6.将相同的单词次数做统计
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
//7.打印
wordAndCountStreams.print()
//8.启动 SparkStreamingContext
ssc.start()
ssc.awaitTermination()
}
}
2、创建DStream Queue
package com.zch.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* Author: zhaoHui
* Date: 2022/01/03
* Time: 18:18
* Description:
*/
object sparkStreaming02_Queue {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("queue")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// 创建RDD队列
val rddQueue = new mutable.Queue[RDD[Int]]()
// 创建QueueInputDStream
val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)
val mappedStream = inputStream.map((_, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
for(i <- 1 to 10){
rddQueue += ssc.sparkContext.makeRDD(1 to 300 , 10)
Thread.sleep(2000)
}
}
}
3、自定义数据源
需要继承Receiver,并实现onStart和onStop方法来自定义数据源
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
private var flag = true
override def onStart(): Unit = {
new Thread (() => while (flag){
val string ="采集的数据为:"+ new Random().nextInt(10).toString
store(string)
Thread.sleep(500)
}).start()
}
override def onStop(): Unit = {
flag = false
}
}
4、Kafka数据源(重点)
需求:通过SparkStreaming从Kafka读取数据
1.导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
2.编写代码
package com.zch.spark.streaming
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Author: zhaoHui
* Date: 2022/01/03
* Time: 19:03
* Description:
*/
object sparkStreaming04_Kafka {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("kafka")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val kafkaParam: Map[String, String] = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
"zhaohui01:9092,zhaohui02:9092,zhaohui03:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "__consumer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
)
val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("first01"), kafkaParam)
)
kafkaDS.map(_.value()).print()
ssc.start()
ssc.awaitTermination()
}
}