Spark提供了DataFrame和DataSet API来处理批量数据,它们把数据转换成RDD,在内存中以迭代器的方式不落盘处理,所以效率很高。但它有一个弊端,就是不能准实时计算数据变化。
为了解决上述问题,Spark引入了Spark Stream来处理准流式数据。为啥说准流式呢?因为它本质上还是批处理,只不过这个批相当小,是微批,接近流式计算,以秒级的时间窗口来处理一批数据。
接下来,老规矩,看官网Spark Streaming
概述
Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。Spark Streaming支持从多种数据源提取数据,如:Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map、reduce、join和window等。最后,Spark Streaming支持将处理完的数据推送到文件系统、数据库或者实时仪表盘中展示。实际上,你完全可以将Spark的机器学习(machine learning) 和 图计算(graph processing)的算法应用于Spark Streaming的数据流当中。
这个图片是对上述文字的可视化展现。从数据源获取数据,加工处理后存储起来或者展示。
下图展示了Spark Streaming的内部工作原理。Spark Streaming从实时数据流接入数据,再将其划分为一个个小批量供后续Spark engine处理,所以实际上,Spark Streaming是按一个个小批量来处理数据流的。
数据流经过Spark Streaming的窗口之后,就变成了微批,对微批的处理也是RDD的DAG,处理完毕变成批处理结果,再加上你的处理逻辑就可以存储或者展示了。
Spark Streaming为这种持续的数据流提供了的一个高级抽象,即:discretized stream(离散数据流)或者叫DStream。DStream既可以从输入数据源创建得来,如:Kafka、Flume或者Kinesis,也可以从其他DStream经一些算子操作得到。其实在内部,一个DStream就是包含了一系列RDD。
Spark Streaming架构
- Master:主要负责整体集群资源的管理和应用程序调度,上图中没有画出来
- Worker:负责单个节点的资源管理,driver 和 executor 的启动等
- Driver:用户入口程序执行的地方,即 SparkContext 执行的地方,主要是 DAG 生成、stage 划分、task 生成及调度
- Executor:负责执行 task,反馈执行状态和执行结果。
离散数据流 (DStreams)
离散数据流(DStream)是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都是不可变、分布式的数据集,每个RDD都包含了特定时间间隔内的一批数据,如下图所示。
任何作用于DStream的算子,其实都会被转化为对其内部RDD的操作。例如,在前面的例子中,我们将 lines 这个DStream转成words DStream对象,其实作用于lines上的flatMap算子,会施加于lines中的每个RDD上,并生成新的对应的RDD,而这些新生成的RDD对象就组成了words这个DStream对象。如下所示。
底层的RDD转换仍然是由Spark引擎来计算。DStream的算子将这些细节隐藏了起来,并为开发者提供了更为方便的高级API。
输入DStream和接收器
输入DStream代表从某种流式数据源流入的数据流。在之前的例子里,lines 对象就是输入DStream,它代表从netcat server收到的数据流。每个输入DStream(除文件数据流外)都和一个接收器(Receiver)相关联,而接收器则是专门从数据源拉取数据到内存中的对象。
Spark Streaming主要提供两种内建的流式数据源:
- 基础数据源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系统,套接字连接或者Akka actor。
- 高级数据源(Advanced sources): 需要依赖额外工具类的源,如:Kafka、Flume、Kinesis、Twitter等数据源。这些数据源都需要增加额外的依赖。
注意,如果你需要同时从多个数据源拉取数据,那么你就需要创建多个DStream对象。多个DStream对象其实也就同时创建了多个数据流接收器。但是请注意,Spark的worker/executor 都是长期运行的,因此它们都会各自占用一个分配给Spark Streaming应用的CPU。所以,在运行Spark Streaming应用的时候,需要注意分配足够的CPU core(本地运行时,需要足够的线程)来处理接收到的数据,同时还要足够的CPU core来运行这些接收器。
注意
- 如果本地运行Spark Streaming应用,记得不能将master设为”local” 或 “local[1]”。这两个值都只会在本地启动一个线程。而如果此时你使用一个包含接收器(如:套接字、Kafka、Flume等)的输入DStream,那么这一个线程只能用于运行这个接收器,而处理数据的逻辑就没有线程来执行了。因此,本地运行时,一定要将master设为”local[n]”,其中 n > 接收器的个数。
- 将Spark Streaming应用置于集群中运行时,同样,分配给该应用的CPU core数必须大于接收器的总数。否则,该应用就只会接收数据,而不会处理数据。
实战
下面我将用一段Java代码向本级端口发送数据,Spark Streaming从该端口接收数据并在时间窗口内统计单词个数。
首先引入Spark Streming的maven依赖。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.1.2</version> </dependency>
接着编写一段发送数据的Java程序。
public class SendMessage { public static void main(String[] args) { new Thread(new Runnable() { int i = 0; public void run() { try { //创建一个serversocket绑定的端口:6666 ServerSocket s = new ServerSocket(6666); Socket client = s.accept(); //获取输出流 OutputStream out = client.getOutputStream(); while(true) { i++; //向6666端口发送数据hello n out.write(new String("hello " + i+"\n").getBytes()); sleep(1000); } } catch (IOException e) { e.printStackTrace(); } } }).start(); } }
然后再用scala写一段Spark Streaming程序来接收并处理数据。
object StreamingTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local[3]") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") //设置输出日志级别 val ssc = new StreamingContext(sc, Seconds(2)) //两秒一个微批 val ds = ssc.socketTextStream("localhost",6666) val result = ds.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(10),Seconds(6)) //10秒的窗口,每6秒滑动一次 result.print() ssc.start() ssc.awaitTermination() } }
首先启动Socket服务端,并写入数据。接着启动spark streaming程序,控制台输出如下。稳定之后,每个窗口处理5个批次的数据,有10个hello。
------------------------------------------- Time: 1632630350000 ms ------------------------------------------- (3,1) (4,1) (hello,5) (1,1) (5,1) (2,1) ------------------------------------------- Time: 1632630356000 ms ------------------------------------------- (6,1) (9,1) (3,1) (4,1) (7,1) (hello,10) (10,1) (8,1) (5,1) (2,1) (11,1) ------------------------------------------- Time: 1632630362000 ms ------------------------------------------- (15,1) (9,1) (12,1) (16,1) (hello,10) (13,1) (10,1) (8,1) (14,1) (17,1) (11,1)