大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器

Scala和Java实现SparkStreaming

Spark Streaming实时流监控端口数据进行WordCount

Scala版本实现Spark Streaming

  • 添加maven依赖
<dependencies>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.6.6</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <!--<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>-->
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
  </dependencies>
  • 编写Scala程序
package nj.zb.kb09.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamDemo1 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamDemo1")

    //采集周期,指定的3秒为每次采集的时间间隔
    val streamingContext = new StreamingContext(sparkConf,Seconds(3))

    //指定采集的方法
    val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.136.100",7777)

    //将采集来的信息进行处理,统计数据(wordcount)

    val wordStream: DStream[String] = socketLineStream.flatMap(line=>line.split("\\s+"))

    val mapStream: DStream[(String, Int)] = wordStream.map(x=>(x,1))

    val wordcountStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    //打印
    wordcountStream.print()


    //启动采集器
    streamingContext.start()

    streamingContext.awaitTermination()
  }
}
  • 在Linux端启动端口服务
[root@hadoop100 ~]# nc -lk 7777

注意:一定要先启动端口服务,再启动Scala程序,不然会直接报错

  • 启动Scala程序
  • 在端口输入单词
hello world
hello spark
hello scala
hello scala
  • 相应的在Scala控制台打印出了单词词频
    大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器
-------------------------------------------
Time: 1608454200000 ms
-------------------------------------------
(hello,1)
(world,1)

-------------------------------------------
Time: 1608454203000 ms
-------------------------------------------
(hello,1)
(spark,1)

-------------------------------------------
Time: 1608454206000 ms
-------------------------------------------

-------------------------------------------
Time: 1608454209000 ms
-------------------------------------------
(hello,1)
(scala,1)

-------------------------------------------
Time: 1608454212000 ms
-------------------------------------------
(hello,1)
(scala,1)

根据指定采集周期,每次间隔3秒采集一次,本质上SparkStreaming是微批处理

Java版本实现SparkStreaming

  • 编写Java程序
package nj.zb.kb09.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class SparkStreamJavaDemo1 {
	public static void main(String[] args) throws InterruptedException {
		SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamJavaDemo1");

		JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

		JavaReceiverInputDStream<String> lines = jsc.socketTextStream("192.168.136.100", 7777);

		JavaDStream<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {
			@Override
			public Iterator<String> call(String s) throws Exception {
				String[] split = s.split("\\s+");
				return Arrays.asList(split).iterator();
			}
		});
		JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String s) throws Exception {
				return new Tuple2<String, Integer>(s, 1);

			}
		});

		JavaPairDStream<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer integer, Integer integer2) throws Exception {
				return integer + integer2;
			}
		});
		reduceByKey.print();
		jsc.start();
		jsc.awaitTermination();
	}
}

  • 启动Java程序
  • 在Linux端开启端口
[root@hadoop100 ~]# nc -lk 7777
  • 在端口输入
hello world
hello scala
hello scala
hello scala
hello world
  • 相应的在Java控制台打印了单词词频
-------------------------------------------
Time: 1608454655000 ms
-------------------------------------------
(hello,2)
(world,1)
(scala,1)

-------------------------------------------
Time: 1608454660000 ms
-------------------------------------------
(hello,1)
(scala,1)

-------------------------------------------
Time: 1608454665000 ms
-------------------------------------------
(hello,1)
(scala,1)

Spark Streaming实时流监控文件夹数据进行WordCount

  • 编写Java程序
package nj.zb.kb09.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamFileDataSource {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamFileDataSource")

    val streamingContext = new StreamingContext(sparkConf,Seconds(5))

    val fileDStream: DStream[String] = streamingContext.textFileStream("in/test/")

    val wordStream: DStream[String] = fileDStream.flatMap(x=>x.split("\\s+"))

    val mapStream: DStream[(String, Int)] = wordStream.map((_,1))

    val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    sumStream.print()

    streamingContext.start()

    streamingContext.awaitTermination()
  }
}

  • 启动Java程序
  • 往文件夹中放入文件a.txt
hello world
hello java
hello scala
hello spark
hello java
hello java
  • 相应的在控制台打印出了结果
-------------------------------------------
Time: 1608549950000 ms
-------------------------------------------

-------------------------------------------
Time: 1608549955000 ms
-------------------------------------------

-------------------------------------------
Time: 1608549960000 ms
-------------------------------------------

-------------------------------------------
Time: 1608549965000 ms
-------------------------------------------
(hello,6)
(java,3)
(world,1)
(scala,1)
(spark,1)

-------------------------------------------
Time: 1608549970000 ms
-------------------------------------------

大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器
注意:原理就是监听指定的文件夹,在监听期间,如果有新的文本被移动到该文件夹中,它就能检测到(毕竟SparkStreaming就是流式计算嘛,这样就可以理解为,监控该文件夹的数据流)。
要特别注意的是(我在这里错了好久,而且问了好多人都不知道为啥出错):它不会读取原本就已经存在于该文件夹里的文本,只会读取在监听期间,传进文件夹的文本,而且该文本还有要求,必须是它最后一次更改并且保存的操作,是在监听开始的那一刻之后的。
其实意思就是,如果要向被监听的文件夹里传一个文本,你就要在监听开始之后,先打开这个文本,随便输入几个空格,或者回车,或者其他不影响文本内容的操作,然后保存,最后再传进文件夹里,这样它才能检测到这个被传进来的文本。(估计它这个用意是只监听被更改过的文本吧)

Spark Streaming和Kafka Stream联用

不显示输出过的结果

  • 编写Java程序
package nj.zb.kb09.spark

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamKafkaSource {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamKafkaSource")

    val streamingContext = new StreamingContext(sparkConf,Seconds(5))

    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.100:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
    )
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"),kafkaParams))

    val wordStream: DStream[String] = kafkaStream.flatMap(v=>v.value().toString.split("\\s+"))

    val mapStream: DStream[(String, Int)] = wordStream.map((_,1))

    val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    sumStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()


  }
}
  • 创建所需topic
[root@hadoop100 ~]# kafka-topics.sh --zookeeper 192.168.136.100:2181 --create --topic sparkKafkaDemo --partitions 1 --repliction-factor 1
  • 启动Java程序
  • 创建sparkKafkaDemo生产者信息
[root@hadoop100 ~]# kafka-console-producer.sh --topic sparkKafkaDemo --broker-list 192.168.136.100:9092
  • 在生产者输入数据
hello world
hello java
hello
hello scala
hello spark
hello
hello
world

大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器

  • 对应在Java程序控制台打印了结果
-------------------------------------------
Time: 1608551710000 ms
-------------------------------------------

-------------------------------------------
Time: 1608551715000 ms
-------------------------------------------
(hello,1)
(world,1)

-------------------------------------------
Time: 1608551720000 ms
-------------------------------------------
(hello,2)
(java,1)

-------------------------------------------
Time: 1608551725000 ms
-------------------------------------------
(hello,1)
(scala,1)

-------------------------------------------
Time: 1608551730000 ms
-------------------------------------------
(hello,3)
(spark,1)

-------------------------------------------
Time: 1608551735000 ms
-------------------------------------------
(world,1)

大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器

显示输出过的结果

  • 编写Java程序
package nj.zb.kb09.spark



import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamKafkaSource {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamKafkaSource")

    val streamingContext = new StreamingContext(sparkConf,Seconds(5))





    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.100:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
    )
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"),kafkaParams))

    val wordStream: DStream[String] = kafkaStream.flatMap(v=>v.value().toString.split("\\s+"))

    streamingContext.checkpoint("ckeckpoint")
    //有状态
    val stateSumStream: DStream[(String, Int)] = mapStream.updateStateByKey {
      case (seq, buffer) => {
        println(seq,seq.sum,buffer.getOrElse(0))
        val sum = buffer.getOrElse(0) + seq.sum
Option(sum)
      }
    }



    //sumStream.print()


    stateSumStream.print()


    streamingContext.start()
    streamingContext.awaitTermination()


  }
}

  • 创建所需的checkpoint文件夹

大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器

  • 创建sparkKafkaDemo生产者信息
[root@hadoop100 ~]# kafka-console-producer.sh --topic sparkKafkaDemo --broker-list 192.168.136.100:9092
  • 启动Java程序
  • 在sparkKafkaDemo生产信息
hello world
hello spark
hello scala
hello java
hello java
  • 相应的在控制台打印出了结果
-------------------------------------------
Time: 1608636755000 ms
-------------------------------------------

-------------------------------------------
Time: 1608636760000 ms
-------------------------------------------

(CompactBuffer(1),1,0)
(CompactBuffer(1),1,0)
-------------------------------------------
Time: 1608636765000 ms
-------------------------------------------
(hello,1)
(spark,1)

(CompactBuffer(),0,1)
(CompactBuffer(),0,1)
-------------------------------------------
Time: 1608636770000 ms
-------------------------------------------
(hello,1)
(spark,1)

(CompactBuffer(1, 1),2,1)
(CompactBuffer(1),1,0)
(CompactBuffer(),0,1)
(CompactBuffer(1),1,0)
-------------------------------------------
Time: 1608636775000 ms
-------------------------------------------
(hello,3)
(java,1)
(scala,1)
(spark,1)

(CompactBuffer(),0,1)
(CompactBuffer(),0,1)
(CompactBuffer(1),1,3)
(CompactBuffer(1),1,1)
-------------------------------------------
Time: 1608636780000 ms
-------------------------------------------
(hello,4)
(java,2)
(scala,1)
(spark,1)

(CompactBuffer(),0,4)
(CompactBuffer(),0,2)
(CompactBuffer(),0,1)
(CompactBuffer(),0,1)
-------------------------------------------
Time: 1608636785000 ms
-------------------------------------------
(hello,4)
(java,2)
(scala,1)
(spark,1)

大数据——Scala和Java实现Spark Streaming实时流监控、Spark Streaming搭配Kafka Stream联用和Spark Streaming自定义采集器

自定义采集器

  • 编写Scala程序
package nj.zb.kb09.spark

import java.io.{BufferedReader, InputStreamReader}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
  var socket:java.net.Socket=null
  def receive(): Unit = {
     socket= new java.net.Socket(host,port)
    val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))
    var line: String =null
    while ((line=reader.readLine() )!=null){
      if(line.equals("end")){
        return
      }else{
        this.store(line)
      }
    }
  }


  override def onStart(): Unit = {
    new Thread(new Runnable {




      override def run(): Unit = {
        receive()
      }
    }).start()
  }

  override def onStop(): Unit = {
    if(socket!=null){
      socket.close()
      socket=null
    }
  }
}
object MyReceiverDemo{
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyReceiverDemo")

    val streamingContext = new StreamingContext(sparkConf,Seconds(5))

    val receiverStream: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver("192.168.136.100",7777))


    val lineStream: DStream[String] = receiverStream.flatMap(_.split("\\s+"))

    val mapStream: DStream[(String, Int)] = lineStream.map((_,1))

    val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)

    sumStream.print()

      streamingContext.start()
    streamingContext.awaitTermination()

  }
}

  • 启动Scala程序
  • 启动端口服务
[root@hadoop100 ~]# nc -lk 7777
  • 输入数据
hello
hello world
hello java
hello java
end
hello java
hello world
  • 相应的在控制台打印出了结果
-------------------------------------------
Time: 1608564875000 ms
-------------------------------------------

-------------------------------------------
Time: 1608564880000 ms
-------------------------------------------
(hello,1)

-------------------------------------------
Time: 1608564885000 ms
-------------------------------------------
(hello,1)
(world,1)

-------------------------------------------
Time: 1608564890000 ms
-------------------------------------------
(hello,1)
(java,1)

-------------------------------------------
Time: 1608564895000 ms
-------------------------------------------
(hello,1)
(java,1)

-------------------------------------------
Time: 1608564900000 ms
-------------------------------------------

-------------------------------------------
Time: 1608564905000 ms
-------------------------------------------

-------------------------------------------
Time: 1608564910000 ms
-------------------------------------------

注意:会发现在我们输入“end”后,后面的输入的数据再也进行不了收集了,是因为我们指定了输入“end”结束收集,虽然收集一直在继续,但是后续的输入数据都接受不到。

上一篇:Spark Streaming简介及运用(含案例)


下一篇:Spark Streaming计算wordCount