大数据学习(29)—— Spark Streaming

Spark提供了DataFrame和DataSet API来处理批量数据,它们把数据转换成RDD,在内存中以迭代器的方式不落盘处理,所以效率很高。但它有一个弊端,就是不能准实时计算数据变化。

为了解决上述问题,Spark引入了Spark Stream来处理准流式数据。为啥说准流式呢?因为它本质上还是批处理,只不过这个批相当小,是微批,接近流式计算,以秒级的时间窗口来处理一批数据。 

接下来,老规矩,看官网Spark Streaming

概述

Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。Spark Streaming支持从多种数据源提取数据,如:Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map、reduce、join和window等。最后,Spark Streaming支持将处理完的数据推送到文件系统、数据库或者实时仪表盘中展示。实际上,你完全可以将Spark的机器学习(machine learning) 和 图计算(graph processing)的算法应用于Spark Streaming的数据流当中。

大数据学习(29)—— Spark Streaming

这个图片是对上述文字的可视化展现。从数据源获取数据,加工处理后存储起来或者展示。

下图展示了Spark Streaming的内部工作原理。Spark Streaming从实时数据流接入数据,再将其划分为一个个小批量供后续Spark engine处理,所以实际上,Spark Streaming是按一个个小批量来处理数据流的。

大数据学习(29)—— Spark Streaming

 

数据流经过Spark Streaming的窗口之后,就变成了微批,对微批的处理也是RDD的DAG,处理完毕变成批处理结果,再加上你的处理逻辑就可以存储或者展示了。

Spark Streaming为这种持续的数据流提供了的一个高级抽象,即:discretized stream(离散数据流)或者叫DStream。DStream既可以从输入数据源创建得来,如:Kafka、Flume或者Kinesis,也可以从其他DStream经一些算子操作得到。其实在内部,一个DStream就是包含了一系列RDD。

Spark Streaming架构

大数据学习(29)—— Spark Streaming

  • Master:主要负责整体集群资源的管理和应用程序调度,上图中没有画出来
  • Worker:负责单个节点的资源管理,driver 和 executor 的启动等
  • Driver:用户入口程序执行的地方,即 SparkContext 执行的地方,主要是 DAG 生成、stage 划分、task 生成及调度
  • Executor:负责执行 task,反馈执行状态和执行结果。

离散数据流 (DStreams)

离散数据流(DStream)是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都是不可变、分布式的数据集,每个RDD都包含了特定时间间隔内的一批数据,如下图所示。

大数据学习(29)—— Spark Streaming

 

任何作用于DStream的算子,其实都会被转化为对其内部RDD的操作。例如,在前面的例子中,我们将 lines 这个DStream转成words DStream对象,其实作用于lines上的flatMap算子,会施加于lines中的每个RDD上,并生成新的对应的RDD,而这些新生成的RDD对象就组成了words这个DStream对象。如下所示。

大数据学习(29)—— Spark Streaming

底层的RDD转换仍然是由Spark引擎来计算。DStream的算子将这些细节隐藏了起来,并为开发者提供了更为方便的高级API。

输入DStream和接收器

输入DStream代表从某种流式数据源流入的数据流。在之前的例子里,lines 对象就是输入DStream,它代表从netcat server收到的数据流。每个输入DStream(除文件数据流外)都和一个接收器(Receiver)相关联,而接收器则是专门从数据源拉取数据到内存中的对象。

Spark Streaming主要提供两种内建的流式数据源:

  • 基础数据源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系统,套接字连接或者Akka actor。
  • 高级数据源(Advanced sources): 需要依赖额外工具类的源,如:Kafka、Flume、Kinesis、Twitter等数据源。这些数据源都需要增加额外的依赖。

注意,如果你需要同时从多个数据源拉取数据,那么你就需要创建多个DStream对象。多个DStream对象其实也就同时创建了多个数据流接收器。但是请注意,Spark的worker/executor 都是长期运行的,因此它们都会各自占用一个分配给Spark Streaming应用的CPU。所以,在运行Spark Streaming应用的时候,需要注意分配足够的CPU core(本地运行时,需要足够的线程)来处理接收到的数据,同时还要足够的CPU core来运行这些接收器。

注意

  • 如果本地运行Spark Streaming应用,记得不能将master设为”local” 或 “local[1]”。这两个值都只会在本地启动一个线程。而如果此时你使用一个包含接收器(如:套接字、Kafka、Flume等)的输入DStream,那么这一个线程只能用于运行这个接收器,而处理数据的逻辑就没有线程来执行了。因此,本地运行时,一定要将master设为”local[n]”,其中 n > 接收器的个数。
  • 将Spark Streaming应用置于集群中运行时,同样,分配给该应用的CPU core数必须大于接收器的总数。否则,该应用就只会接收数据,而不会处理数据。

实战

下面我将用一段Java代码向本级端口发送数据,Spark Streaming从该端口接收数据并在时间窗口内统计单词个数。

首先引入Spark Streming的maven依赖。

      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.12</artifactId>
          <version>3.1.2</version>
      </dependency>

接着编写一段发送数据的Java程序。

public class SendMessage {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            int i = 0;

            public void run() {
                try {

                    //创建一个serversocket绑定的端口:6666
                    ServerSocket s = new ServerSocket(6666);

                    Socket client = s.accept();

                    //获取输出流
                    OutputStream out = client.getOutputStream();

                    while(true) {
                        i++;
                        //向6666端口发送数据hello n
                        out.write(new String("hello " + i+"\n").getBytes());
                        sleep(1000);
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

然后再用scala写一段Spark Streaming程序来接收并处理数据。

object StreamingTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local[3]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")  //设置输出日志级别
    val ssc = new StreamingContext(sc, Seconds(2)) //两秒一个微批

    val ds = ssc.socketTextStream("localhost",6666)
    val result = ds.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(10),Seconds(6))  //10秒的窗口,每6秒滑动一次
    
    result.print()
    ssc.start()

    ssc.awaitTermination()
  }
}

首先启动Socket服务端,并写入数据。接着启动spark streaming程序,控制台输出如下。稳定之后,每个窗口处理5个批次的数据,有10个hello。

-------------------------------------------
Time: 1632630350000 ms
-------------------------------------------
(3,1)
(4,1)
(hello,5)
(1,1)
(5,1)
(2,1)

-------------------------------------------
Time: 1632630356000 ms
-------------------------------------------
(6,1)
(9,1)
(3,1)
(4,1)
(7,1)
(hello,10)
(10,1)
(8,1)
(5,1)
(2,1)
(11,1)

-------------------------------------------
Time: 1632630362000 ms
-------------------------------------------
(15,1)
(9,1)
(12,1)
(16,1)
(hello,10)
(13,1)
(10,1)
(8,1)
(14,1)
(17,1)
(11,1)
上一篇:spark的转换算子及一个案例


下一篇:DStream以及基本工作原理