目录
1、简介
Spark Streaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。
常用数据源:Kafka、Flume、HDFS、TCP socket、File等
处理后的数据传输到:HDFS、Databases、实时报表(仪表盘dashboards)
SparkStreaming提供了许多高级API算子,可以实现底层用复杂的算法处理数据。
SparkStreaming 接受实时输入的数据流然后将数据切分成不同的批次,然后经过Spark引擎将结果数据以batch的形式生成最后的流,实际上SparkStreaming是微批处理。示意图如下:
2、应用示例
pom依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
</dependency>
案例一:采集端口数据实现wordcount(Scala版本)
采集7777端口的数据,利用Spark Streaming实现wordcount,结果输出到控制台。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamDemo1 {
def main(args: Array[String]): Unit = {
// 创建SparkConf,配置master为local
val conf:SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstream")
// 实例化StreamingContext
// 设置采集周期,指定的3秒为每次采集的时间间隔
val streamingContext = new StreamingContext(conf,Seconds(3))
//指定采集的方法
var socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.136.20", 7777)
val wordStream: DStream[String] = socketLineStream.flatMap(line=>line.split("\\s+"))
val mapStream: DStream[(String, Int)] = wordStream.map(x=>(x,1))
val wordcountStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)
//打印
wordcountStream.print()
//启动收集器
streamingContext.start()
//等待处理停止
streamingContext.awaitTermination()
}
}
启动端口输入测试数据:
nc -lk 7777
案例二:采集端口数据实现wordcount(Java版本)
采集7777端口的数据,利用Spark Streaming实现wordcount,结果输出到控制台。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class SparkStreamJavaDemo1 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkstream");
//采集周期,指定的3秒为每次采集的时间间隔
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(3));
//指定采集的方法
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("192.168.136.20", 7777);
//逻辑实现
JavaDStream<String> fm = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] split = s.split("\\s+");
return Arrays.asList(split).iterator();
}
});
JavaPairDStream<String, Integer> pair = fm.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> reduceByKey = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//停止
reduceByKey.print();
//启动收集器
jsc.start();
//等待处理停止
try {
jsc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
案例三:采集目录下的文件数据实现wordcount
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamFileDataSourceDemo2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("fileDataSource")
val streamingContext = new StreamingContext(conf,Seconds(5))
//采集器
val fileDStream: DStream[String] = streamingContext.textFileStream("D:\\sparkStreaming")
val wordStream: DStream[String] = fileDStream.flatMap(line=>line.split("\\s+"))
val mapStream: DStream[(String, Int)] = wordStream.map((_,1))
val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)
sumStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
案例四:采集Kafka数据实现wordcount
从Kafka读数据,Spark Streaming读出来。
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamKafkaSource {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("kafkademo").setMaster("local[*]")
val streamingContext = new StreamingContext(conf,Seconds(5)) //批处理时间设置为5秒
//需要设置检查点
streamingContext.checkpoint("in/checkpoint") //检查点存放目录
val kafkaParams: Map[String, String] = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG -> "kafkaGroup1")
)
//采集器
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("SparkKafkaDemo"), kafkaParams) //SparkKafkaDemo是Kafka的topic
)
val wordStream: DStream[String] = kafkaStream.flatMap(v=>v.value().toString.split("\\s+"))
val mapStream: DStream[(String, Int)] = wordStream.map((_,1))
//无状态
//val sumStream: DStream[(String, Int)] = mapStream.reduceByKey(_+_)
//有状态
val sumStream: DStream[(String, Int)] = mapStream.updateStateByKey {
case (seq, buffer) => {
println(seq,seq.sum,buffer.getOrElse(0)) //(CompactBuffer(1),1,5) 5代表已经有的计数,1代表新传了一个,最终会输出6
//seq指序列,buffer指新传入的数据
val sum = buffer.getOrElse(0) + seq.sum
Option(sum)
}
}
sumStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
创建生产信息进行测试
kafka-console-producer.sh --topic SparkKafkaDemo --broker-list 192.168.136.20:9092
案例五:自定义采集器
需要继承Receiver
import java.io.{BufferedReader, InputStreamReader}
import org.apache.spark.{SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
//自定义采集器
class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
var socket:java.net.Socket = null
def receive():Unit={
socket = new java.net.Socket(host,port)
val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))
var line:String=null
while ((line=reader.readLine())!=null){
//采集器简单的逻辑。如果消息是end,去除,否则保留
if(line.equals("end")){
return
}else{
this.store(line)
}
}
}
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
receive()
}
}).start()
}
override def onStop(): Unit = {
if (socket!=null){
socket.close()
socket=null
}
}
}
object MyReceiverDemo{
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("myreceiverdemo").setMaster("local[*]")
val streamingContext: StreamingContext = new StreamingContext(conf,Seconds(5))
//调用自定义采集器
val receiverStream: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver("192.168.136.20",7777))
val lineStream: DStream[String] = receiverStream.flatMap(line=>line.split("\\s+"))
val wordStream: DStream[(String, Int)] = lineStream.map((_,1))
val sumStream: DStream[(String, Int)] = wordStream.reduceByKey(_+_)
sumStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}