Scala和Java实现SparkStreaming
- Spark Streaming实时流监控端口数据进行WordCount
- Spark Streaming实时流监控文件夹数据进行WordCount
- Spark Streaming和Kafka Stream联用
- 自定义采集器
Spark Streaming实时流监控端口数据进行WordCount
Scala版本实现Spark Streaming
- 添加maven依赖
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<!--<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>-->
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
- 编写Scala程序
package nj.zb.kb09.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamDemo1 {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamDemo1")
//采集周期,指定的3秒为每次采集的时间间隔
val streamingContext = new StreamingContext(sparkConf,Seconds(3))
//指定采集的方法
val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.136.100",7777)
//将采集来的信息进行处理,统计数据(wordcount)
val wordStream: DStream[String] = socketLineStream.flatMap(line=>line.split("\\s+"))
val mapStream: DStream[(String, Int)] = wordStream.map(x=>(x,1))
val wordcountStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)
//打印
wordcountStream.print()
//启动采集器
streamingContext.start()
streamingContext.awaitTermination()
}
}
- 在Linux端启动端口服务
[root@hadoop100 ~]# nc -lk 7777
注意:一定要先启动端口服务,再启动Scala程序,不然会直接报错
- 启动Scala程序
- 在端口输入单词
hello world
hello spark
hello scala
hello scala
- 相应的在Scala控制台打印出了单词词频
-------------------------------------------
Time: 1608454200000 ms
-------------------------------------------
(hello,1)
(world,1)
-------------------------------------------
Time: 1608454203000 ms
-------------------------------------------
(hello,1)
(spark,1)
-------------------------------------------
Time: 1608454206000 ms
-------------------------------------------
-------------------------------------------
Time: 1608454209000 ms
-------------------------------------------
(hello,1)
(scala,1)
-------------------------------------------
Time: 1608454212000 ms
-------------------------------------------
(hello,1)
(scala,1)
根据指定采集周期,每次间隔3秒采集一次,本质上SparkStreaming是微批处理
Java版本实现SparkStreaming
- 编写Java程序
package nj.zb.kb09.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class SparkStreamJavaDemo1 {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamJavaDemo1");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("192.168.136.100", 7777);
JavaDStream<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] split = s.split("\\s+");
return Arrays.asList(split).iterator();
}
});
JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
reduceByKey.print();
jsc.start();
jsc.awaitTermination();
}
}
- 启动Java程序
- 在Linux端开启端口
[root@hadoop100 ~]# nc -lk 7777
- 在端口输入
hello world
hello scala
hello scala
hello scala
hello world
- 相应的在Java控制台打印了单词词频
-------------------------------------------
Time: 1608454655000 ms
-------------------------------------------
(hello,2)
(world,1)
(scala,1)
-------------------------------------------
Time: 1608454660000 ms
-------------------------------------------
(hello,1)
(scala,1)
-------------------------------------------
Time: 1608454665000 ms
-------------------------------------------
(hello,1)
(scala,1)
Spark Streaming实时流监控文件夹数据进行WordCount
- 编写Java程序
package nj.zb.kb09.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamFileDataSource {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamFileDataSource")
val streamingContext = new StreamingContext(sparkConf,Seconds(5))
val fileDStream: DStream[String] = streamingContext.textFileStream("in/test/")
val wordStream: DStream[String] = fileDStream.flatMap(x=>x.split("\\s+"))
val mapStream: DStream[(String, Int)] = wordStream.map((_,1))
val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)
sumStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
- 启动Java程序
- 往文件夹中放入文件a.txt
hello world
hello java
hello scala
hello spark
hello java
hello java
- 相应的在控制台打印出了结果
-------------------------------------------
Time: 1608549950000 ms
-------------------------------------------
-------------------------------------------
Time: 1608549955000 ms
-------------------------------------------
-------------------------------------------
Time: 1608549960000 ms
-------------------------------------------
-------------------------------------------
Time: 1608549965000 ms
-------------------------------------------
(hello,6)
(java,3)
(world,1)
(scala,1)
(spark,1)
-------------------------------------------
Time: 1608549970000 ms
-------------------------------------------
注意:原理就是监听指定的文件夹,在监听期间,如果有新的文本被移动到该文件夹中,它就能检测到(毕竟SparkStreaming就是流式计算嘛,这样就可以理解为,监控该文件夹的数据流)。
要特别注意的是(我在这里错了好久,而且问了好多人都不知道为啥出错):它不会读取原本就已经存在于该文件夹里的文本,只会读取在监听期间,传进文件夹的文本,而且该文本还有要求,必须是它最后一次更改并且保存的操作,是在监听开始的那一刻之后的。
其实意思就是,如果要向被监听的文件夹里传一个文本,你就要在监听开始之后,先打开这个文本,随便输入几个空格,或者回车,或者其他不影响文本内容的操作,然后保存,最后再传进文件夹里,这样它才能检测到这个被传进来的文本。(估计它这个用意是只监听被更改过的文本吧)
Spark Streaming和Kafka Stream联用
不显示输出过的结果
- 编写Java程序
package nj.zb.kb09.spark
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamKafkaSource {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamKafkaSource")
val streamingContext = new StreamingContext(sparkConf,Seconds(5))
val kafkaParams: Map[String, String] = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.100:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"),kafkaParams))
val wordStream: DStream[String] = kafkaStream.flatMap(v=>v.value().toString.split("\\s+"))
val mapStream: DStream[(String, Int)] = wordStream.map((_,1))
val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)
sumStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
- 创建所需topic
[root@hadoop100 ~]# kafka-topics.sh --zookeeper 192.168.136.100:2181 --create --topic sparkKafkaDemo --partitions 1 --repliction-factor 1
- 启动Java程序
- 创建sparkKafkaDemo生产者信息
[root@hadoop100 ~]# kafka-console-producer.sh --topic sparkKafkaDemo --broker-list 192.168.136.100:9092
- 在生产者输入数据
hello world
hello java
hello
hello scala
hello spark
hello
hello
world
- 对应在Java程序控制台打印了结果
-------------------------------------------
Time: 1608551710000 ms
-------------------------------------------
-------------------------------------------
Time: 1608551715000 ms
-------------------------------------------
(hello,1)
(world,1)
-------------------------------------------
Time: 1608551720000 ms
-------------------------------------------
(hello,2)
(java,1)
-------------------------------------------
Time: 1608551725000 ms
-------------------------------------------
(hello,1)
(scala,1)
-------------------------------------------
Time: 1608551730000 ms
-------------------------------------------
(hello,3)
(spark,1)
-------------------------------------------
Time: 1608551735000 ms
-------------------------------------------
(world,1)
显示输出过的结果
- 编写Java程序
package nj.zb.kb09.spark
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamKafkaSource {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamKafkaSource")
val streamingContext = new StreamingContext(sparkConf,Seconds(5))
val kafkaParams: Map[String, String] = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.100:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"),kafkaParams))
val wordStream: DStream[String] = kafkaStream.flatMap(v=>v.value().toString.split("\\s+"))
streamingContext.checkpoint("ckeckpoint")
//有状态
val stateSumStream: DStream[(String, Int)] = mapStream.updateStateByKey {
case (seq, buffer) => {
println(seq,seq.sum,buffer.getOrElse(0))
val sum = buffer.getOrElse(0) + seq.sum
Option(sum)
}
}
//sumStream.print()
stateSumStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
- 创建所需的checkpoint文件夹
- 创建sparkKafkaDemo生产者信息
[root@hadoop100 ~]# kafka-console-producer.sh --topic sparkKafkaDemo --broker-list 192.168.136.100:9092
- 启动Java程序
- 在sparkKafkaDemo生产信息
hello world
hello spark
hello scala
hello java
hello java
- 相应的在控制台打印出了结果
-------------------------------------------
Time: 1608636755000 ms
-------------------------------------------
-------------------------------------------
Time: 1608636760000 ms
-------------------------------------------
(CompactBuffer(1),1,0)
(CompactBuffer(1),1,0)
-------------------------------------------
Time: 1608636765000 ms
-------------------------------------------
(hello,1)
(spark,1)
(CompactBuffer(),0,1)
(CompactBuffer(),0,1)
-------------------------------------------
Time: 1608636770000 ms
-------------------------------------------
(hello,1)
(spark,1)
(CompactBuffer(1, 1),2,1)
(CompactBuffer(1),1,0)
(CompactBuffer(),0,1)
(CompactBuffer(1),1,0)
-------------------------------------------
Time: 1608636775000 ms
-------------------------------------------
(hello,3)
(java,1)
(scala,1)
(spark,1)
(CompactBuffer(),0,1)
(CompactBuffer(),0,1)
(CompactBuffer(1),1,3)
(CompactBuffer(1),1,1)
-------------------------------------------
Time: 1608636780000 ms
-------------------------------------------
(hello,4)
(java,2)
(scala,1)
(spark,1)
(CompactBuffer(),0,4)
(CompactBuffer(),0,2)
(CompactBuffer(),0,1)
(CompactBuffer(),0,1)
-------------------------------------------
Time: 1608636785000 ms
-------------------------------------------
(hello,4)
(java,2)
(scala,1)
(spark,1)
自定义采集器
- 编写Scala程序
package nj.zb.kb09.spark
import java.io.{BufferedReader, InputStreamReader}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
var socket:java.net.Socket=null
def receive(): Unit = {
socket= new java.net.Socket(host,port)
val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))
var line: String =null
while ((line=reader.readLine() )!=null){
if(line.equals("end")){
return
}else{
this.store(line)
}
}
}
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
receive()
}
}).start()
}
override def onStop(): Unit = {
if(socket!=null){
socket.close()
socket=null
}
}
}
object MyReceiverDemo{
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyReceiverDemo")
val streamingContext = new StreamingContext(sparkConf,Seconds(5))
val receiverStream: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver("192.168.136.100",7777))
val lineStream: DStream[String] = receiverStream.flatMap(_.split("\\s+"))
val mapStream: DStream[(String, Int)] = lineStream.map((_,1))
val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)
sumStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
- 启动Scala程序
- 启动端口服务
[root@hadoop100 ~]# nc -lk 7777
- 输入数据
hello
hello world
hello java
hello java
end
hello java
hello world
- 相应的在控制台打印出了结果
-------------------------------------------
Time: 1608564875000 ms
-------------------------------------------
-------------------------------------------
Time: 1608564880000 ms
-------------------------------------------
(hello,1)
-------------------------------------------
Time: 1608564885000 ms
-------------------------------------------
(hello,1)
(world,1)
-------------------------------------------
Time: 1608564890000 ms
-------------------------------------------
(hello,1)
(java,1)
-------------------------------------------
Time: 1608564895000 ms
-------------------------------------------
(hello,1)
(java,1)
-------------------------------------------
Time: 1608564900000 ms
-------------------------------------------
-------------------------------------------
Time: 1608564905000 ms
-------------------------------------------
-------------------------------------------
Time: 1608564910000 ms
-------------------------------------------
注意:会发现在我们输入“end”后,后面的输入的数据再也进行不了收集了,是因为我们指定了输入“end”结束收集,虽然收集一直在继续,但是后续的输入数据都接受不到。