概述
- 流式计算框架,类似Storm
- 严格来说不是真正的流式计算(实时计算),而是把连续的数据当做不连续的RDD处理,本质是离散计算
- Flink:和 Spark Streaming 相反,把离散数据当成流式数据处理
基础
- 易用,已经集成在Spark中
- 容错性,底层也是RDD
- 支持Java、Scala、Python
WordCount
- nc -l -p 1234
- bin/run-example streaming.NetworkWordCount localhost 1234
- cpu核心数必须>1,不记录之前的状态
1 import org.apache.spark.SparkConf 2 import org.apache.spark.storage.StorageLevel 3 import org.apache.spark.streaming.{Seconds, StreamingContext} 4 5 // 创建一个StreamingContext,创建一个DSteam(离散流) 6 // DStream表现形式:RDD 7 // 使用DStream把连续的数据流变成不连续的RDD 8 object MyNetworkWordCount { 9 def main(args: Array[String]): Unit = { 10 // 创建一个StreamingContext对象,以local模式为例 11 // 保证CPU核心>=2,setMaster("[2]"),开启两个线程 12 val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") 13 14 // 两个参数:1.conf 和 2.采样时间间隔:每隔3s 15 val ssc = new StreamingContext(conf,Seconds(3)) 16 17 // 创建DStream,从netcat服务器接收数据 18 val lines = ssc.socketTextStream("192.168.174.111",1234,StorageLevel .MEMORY_ONLY) 19 20 // 进行单词计数 21 val words = lines.flatMap(_.split(" ")) 22 23 // 计数 24 val wordCount = words.map((_,1)).reduceByKey(_+_) 25 26 // 打印结果 27 wordCount.print() 28 29 // 启动StreamingContext,进行计算 30 ssc.start() 31 32 // 等待任务结束 33 ssc.awaitTermination() 34 } 35 }
高级特性
- 什么是DStream:离散流,把连续的数据流变成不连续的RDD
- transform
- updateStateByKey(func):累加之前的结果,设置检查点,把之前的结果保存到检查点目录下
- hdfs dfs -mkdir -p /day0614/ckpt
- hdfs dfs -ls /day0614/ckpt
1 import org.apache.spark.SparkConf 2 import org.apache.spark.storage.StorageLevel 3 import org.apache.spark.streaming.{Seconds, StreamingContext} 4 5 // 创建一个StreamingContext,创建一个DSteam(离散流) 6 // DStream表现形式:RDD 7 // 使用DStream把连续的数据流变成不连续的RDD 8 object MyTotalNetworkWordCount { 9 def main(args: Array[String]): Unit = { 10 // 创建一个StreamingContext对象,以local模式为例 11 // 保证CPU核心>=2,setMaster("[2]"),开启两个线程 12 val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") 13 14 // 两个参数:1.conf 和 2.采样时间间隔:每隔3s 15 val ssc = new StreamingContext(conf,Seconds(3)) 16 17 // 设置检查点目录,保存之前状态 18 ssc.checkpoint("hdfs://192.168.174.111:9000/day0614/ckpt") 19 20 // 创建DStream,从netcat服务器接收数据 21 val lines = ssc.socketTextStream("192.168.174.111",1234,StorageLevel .MEMORY_ONLY) 22 23 // 进行单词计数 24 val words = lines.flatMap(_.split(" ")) 25 26 // 计数 27 val wordPair = words.map(w => (w,1)) 28 29 // 定义值函数 30 // 两个参数:1.当前的值 2.之前的结果 31 val addFunc = (curreValues:Seq[Int],previousValues:Option[Int])=>{ 32 // 把当前序列进行累加 33 val currentTotal = curreValues.sum 34 35 // 在之前的值上再累加 36 // 如果之前没有值,返回0 37 Some(currentTotal + previousValues.getOrElse(0)) 38 } 39 40 // 累加计算 41 val total = wordPair.updateStateByKey(addFunc) 42 43 total.print() 44 45 ssc.start() 46 47 ssc.awaitTermination() 48 49 } 50 }
- 窗口操作
- 只统计在窗口中的数据
- Exception in thread "main" java.lang.Exception: The slide duration of windowed DStream (10000 ms) must be a multiple of the slide duration of parent DStream (3000 ms)
- 滑动距离必须是采样频率的整数倍
1 package day0614 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.SparkConf 5 import org.apache.spark.storage.StorageLevel 6 import org.apache.spark.streaming.{Seconds, StreamingContext} 7 8 9 object MyNetworkWordCountByWindow { 10 def main(args: Array[String]): Unit = { 11 // 不打印日志 12 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) 13 Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 14 // 创建一个StreamingContext对象,以local模式为例 15 // 保证CPU核心>=2,setMaster("[2]"),开启两个线程 16 val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") 17 18 // 两个参数:1.conf 和 2.采样时间间隔:每隔3s 19 val ssc = new StreamingContext(conf,Seconds(3)) 20 21 // 创建DStream,从netcat服务器接收数据 22 val lines = ssc.socketTextStream("192.168.174.111",1234,StorageLevel .MEMORY_ONLY) 23 24 // 进行单词计数 25 val words = lines.flatMap(_.split(" ")).map((_,1)) 26 27 // 每9s,把过去30s的数据进行WordCount 28 // 参数:1.操作 2.窗口大小 3.窗口滑动距离 29 val result = words.reduceByKeyAndWindow((x:Int,y:Int)=>(x+y),Seconds(30),Seconds(9)) 30 31 result.print() 32 ssc.start() 33 ssc.awaitTermination() 34 } 35 }
- 集成Spark SQL
- 使用SQL语句分析流式数据
1 package day0614 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.SparkConf 5 import org.apache.spark.sql.SparkSession 6 import org.apache.spark.storage.StorageLevel 7 import org.apache.spark.streaming.{Seconds, StreamingContext} 8 9 object MyNetworkWordCountWithSQL { 10 def main(args: Array[String]): Unit = { 11 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) 12 Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 13 // 创建一个StreamingContext对象,以local模式为例 14 // 保证CPU核心>=2,setMaster("[2]"),开启两个线程 15 val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") 16 17 // 两个参数:1.conf 和 2.采样时间间隔:每隔3s 18 val ssc = new StreamingContext(conf,Seconds(3)) 19 20 // 创建DStream,从netcat服务器接收数据 21 val lines = ssc.socketTextStream("192.168.174.111",1234,StorageLevel .MEMORY_ONLY) 22 23 // 进行单词计数 24 val words = lines.flatMap(_.split(" ")) 25 26 // 集成Spark SQL,使用SQL语句进行WordCount 27 words.foreachRDD(rdd=> { 28 // 创建SparkSession对象 29 val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() 30 31 // 把rdd转成DataFrame 32 import spark.implicits._ 33 val df1 = rdd.toDF("word") // 表df1:只有一个列"word" 34 35 // 创建视图 36 df1.createOrReplaceTempView("words") 37 38 // 执行SQL,通过SQL执行WordCount 39 spark.sql("select word,count(*) from words group by word").show 40 }) 41 42 ssc.start() 43 ssc.awaitTermination() 44 } 45 }
数据源
- 基本数据源
- 文件流
1 import org.apache.log4j.{Level, Logger} 2 import org.apache.spark.SparkConf 3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.{Seconds, StreamingContext} 5 6 object FileStreaming { 7 def main(args: Array[String]): Unit = { 8 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) 9 Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 10 // 创建一个StreamingContext对象,以local模式为例 11 // 保证CPU核心>=2,setMaster("[2]"),开启两个线程 12 val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") 13 14 // 两个参数:1.conf 和 2.采样时间间隔:每隔3s 15 val ssc = new StreamingContext(conf,Seconds(3)) 16 17 // 直接监控本地的某个目录,如果有新的文件产生,就读取进来 18 val lines = ssc.textFileStream("F:\\idea-workspace\\temp") 19 20 lines.print() 21 ssc.start() 22 ssc.awaitTermination() 23 } 24 }
-
- RDD队列流
1 import org.apache.log4j.{Level, Logger} 2 import org.apache.spark.SparkConf 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.{Seconds, StreamingContext} 6 import scala.collection.mutable.Queue 7 8 object RDDQueueStream { 9 def main(args: Array[String]): Unit = { 10 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) 11 Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 12 // 创建一个StreamingContext对象,以local模式为例 13 // 保证CPU核心>=2,setMaster("[2]"),开启两个线程 14 val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") 15 16 // 两个参数:1.conf 和 2.采样时间间隔:每隔1s 17 val ssc = new StreamingContext(conf,Seconds(1)) 18 19 // 创建队列,作为数据源 20 val rddQueue = new Queue[RDD[Int]]() 21 for(i<-1 to 3){ 22 rddQueue += ssc.sparkContext.makeRDD(1 to 10) 23 // 睡1s 24 Thread.sleep(1000) 25 } 26 27 // 从队列中接收数据,创建DStream 28 val inputDStream = ssc.queueStream(rddQueue) 29 30 // 处理数据 31 val result = inputDStream.map(x=>(x,x*2)) 32 result.print() 33 34 ssc.start() 35 ssc.awaitTermination() 36 } 37 }
-
- 套接字流(socketTextStream)
- 高级数据源
- Flume
- Kafka
Kafka
- 概述
- 一种高吞吐量的分布式发布订阅消息系统
- 消息类型:主体Topic(广播)、队列Queue(一对一)
- 消息系统类型:同步消息系统、异步消息系统
- 常见消息产品:Redis、Kafka、JMS
- 安装
- config/server.properties
- bin/kafka-server-start.sh config/server.properties &
- bin/kafka-topics.sh --create --zookeeper bigdata111:2181 -replication-factor 1 --partitions 3 --topic mydemo1
- bin/kafka-console-producer.sh --broker-list bigdata111:9092 --topic mydemo1
- bin/kafka-console-consumer.sh --zookeeper bigdata111:2181 --topic mydemo1
性能优化参数