SparkStreaming

SparkStreaming(1) ~ SparkStreaming编程指南

之所以写这部分内容的原因是, 无论是网络上可以直接找到的资料, 还是出版的书籍种种, 版本大都在1.6~2.0不等, 且资源零零散散, 需要到处百度, 搜罗资源.

但根据个人开发了一段时间的感觉来看, 会遇到的绝大多数问题, 都可以在官方文档中找到答案.

因此也可以理解为这是官方文档的部分翻译.

个人英文水平有限, 如有错漏欢迎指正.

就目前来看, 主要分为这样几个板块.

  1. Spark Streaming Programming Guide 也即SparkStreaming编程指南.

  2. Submitting Applications Spark部署发布相关

  3. Tuning Spark Spark调优

  4. Spark Configuration Spark可用配置, 可选参数.

目前已经有了Spark Streaming的中文翻译. 参考:

Spark Streaming编程指南

Spark编程指南

内容本身会比较多, 因此会拆开来, 分多篇介绍.

在这里就不从word count的简单示例开始了, 而是直接从基础概念开始.

Maven依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.3</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>2.4.3</version>
    <scope>provided</scope>
</dependency>

而同样当前版本对应的中间件:

Source  Artifact
Kafka   spark-streaming-kafka-0-10_2.12
Flume   spark-streaming-flume_2.12
Kinesis spark-streaming-kinesis-asl_2.12 [Amazon Software License]

而更完整的, 更新的中间件 Maven 仓库路径为:

Maven repository

如果觉得欠缺什么, 不妨找找试试.

初始化Streaming Context

为了初始化一个 Spark Streaming 程序,一个 StreamingContext 对象必须要被创建出来,它是所有的 Spark Streaming 功能的主入口点.

有两种创建方式:

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

其中:

appName 是在Spark UI上展示所使用的名称.

master 是一个 Spark, Mesos or YARN cluster URL, 不了解也没关系, 这部分会在Spark-submit时介绍到.

master 这个指的是Spark项目运行所在的集群. 如果想在本地启动SparkStreaming项目: 可以使用一个特殊的 “local[*]” , 启动Spark的本地模式, *表示会自动检测系统的内核数量.

然而在集群环境下, 一般不采用硬编码的方式使用spark, 即 setMaster. 我们有更好的方式 通过 spark-submit 在提交时指定master参数即可.

需要注意到的是, 这句代码会在内部创建一个 SparkContext, 可以通过 ssc.sparkContext 访问使用.

batch interval 也即 new Duration(1000) 在这里指的是毫秒值, 还可以采用Durations来创建.

Durations.seconds(5)

这个时间, 必须根据您的应用程序和可用的集群资源的等待时间要求进行设置.

另一种创建 SparkStreamingContext的方式为:

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

在已经有了Context之后, 我们需要做的是:

  1. 创建Input DStreams, 如kafka就有相应的方法可以创建DStream

  2. 对输入流做 转换 处理, 也即我们的功能部分.

  3. 开始接收输入并且使用 streamingContext.start() 来处理数据.

  4. 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).

  5. 使用 streamingContext.stop() 来手动的停止处理.

同时, 有几点需要注意的地方:

  • 一旦一个 context 已经启动,将不会有新的数据流的计算可以被创建或者添加到它.

  • 一旦一个 context 已经停止,它不会被重新启动.

  • 同一时间内在 JVM 中只有一个 StreamingContext 可以被激活. 也即假设在使用SparkStreaming的同时, 需要依赖 SparkContext 或 SparkSQL等做一些操作, 此时不能重新创建 SparkContext 或是 SparkSQL(因为SparkSQL依然会创建Context.) 需要直接使用ssc.sparkContext.

  • 调用 ssc.stop() 会在停止 SparkStreamingContext的同时 停止 SparkContext. 如果需要仅停止 StreamingContext,需要使用 ssc.stop(false);

  • 一个 SparkContext 就可以被重用以创建多个 StreamingContexts,只要前一个 StreamingContext 在下一个StreamingContext 被创建之前停止(不停止 SparkContext, 即使用 ssc.stop(false)).

在这里额外添加一条说明, ssc.stop 可以接收第二个参数, 是指定是否执行完当前批次的剩余数据.

Discretized Streams

Discretized Stream or DStream 是 Spark Streaming 提供的基本抽象.

有且仅有两种方式创建一个DStream, 第一种是通过 Spark的API去创建流, 第二种是从一个流转换成另一个流.

在内部,一个 DStream 被表示为一系列连续的 RDDs,它是 Spark 中一个不可改变的抽象,distributed dataset.在一个 DStream 中的每个 RDD 包含来自一定的时间间隔的数据,如下图所示.

SparkStreaming

应用于 DStream 的任何操作转化为对于底层的 RDDs 的操作.例如,在 先前的示例,转换一个行(lines)流成为单词(words)中,flatMap 操作被应用于在行离散流(lines DStream)中的每个 RDD 来生成单词离散流(words DStream)的 RDDs.如下所示.

SparkStreaming

因此对于RDD支持的操作, DStream也基本都支持.

Input DStreams 和 Receivers(接收器)

输入 DStreams 是代表输入数据是从流的源数据(streaming sources)接收到的流的 DStream.每一个 input DStream(除了 file stream 之外)与一个 Receiver 对象关联,它从 source(数据源)中获取数据,并且存储它到 Spark 的内存中用于处理.

receiver的java代码如下:

class MyReceiver extends Receiver<String> {

    public MyReceiver(StorageLevel storageLevel) {
        //StorageLevel表示存储级别
        super(storageLevel);
    }

    public void onStart() {
        //1. 启动线程, 打开Socket连接, 准备开始接收数据
        //2. 启动一个非阻塞线程去接收数据.
        //3. 调用Store方法将数据存储到 Spark的内存中, store方法有多种实现,支持将多种多样的数据进行存储.
        //4. 在发生错误或异常时根据自身的处理策略调用stop, restart, reportError 方法.
    }

    public void onStop() {
        //清理各种线程,未关闭的链接等等
    }
}

Spark Streaming 提供了两种内置的 streaming source(流的数据源).

  • Basic sources(基础的数据源):在 StreamingContext API 中直接可以使用的数据源.例如:file systems 和 socket connections.

  • Advanced sources(高级的数据源):像 Kafka,Flume,Kinesis,等等这样的数据源.可以通过对应的maven repository 找到依赖.

需要注意到的是, 如果你想要在你的流处理程序中并行的接收多个数据流,你可以创建多个 input DStreams.这将创建同时接收多个数据流的多个 receivers(接收器),然而,一个 Spark 的 worker/executor 是一个长期运行的任务(task),因此它将占用分配给 Spark Streaming 的应用程序的所有核中的一个核(core).

因此,需要记住,一个 Spark Streaming 应用需要分配足够的核(core)(或线程(threads),如果本地运行的话)来处理所接收的数据,以及来运行接收器(receiver(s)).

因此相应的就需要在创建master的时候 不要使用local[1] 或 local 仅分配一个线程, 这将会使得receiver得到一个线程 而对应的程序则没有线程可以处理.

在集群模式下, 则需要分配适当的核心数.

而在使用中, 我的数据源是来自于kafka, 使用的是 kafkaUtils.createDirectStream. 而使用的 core数只有1, 或采用 local[1] 也能够正常运行, 这是不是说明上面的说法是错误的呢?

并不是, 由KafkaUtils.createDirectStream 创建的是DStream, 而并非单纯的使用 receiver的方式实现.

如果采用了自定义的 receiver, 那么此时通过 javaSparkContext.receiveData() 的方式创建流, 就至少需要两个线程, 或两个核心才能够正常运行.

上一篇:spark streaming基础


下一篇:具有安全性的300MB视频流(PHP)