预览
Spark Streaming是Spark核心API的扩展,支持高扩展,高吞吐量,实时数据流的容错流处理。数据可以从Kafka,Flume或TCP socket等许多来源获取,并且可以使用复杂的算法进行处理(比如map,reduce,join,window等高级函数)。最终,处理的结果数据可以推送到文件系统,数据库或实时仪表盘上。
在内部,它的工作原理如下图。Spark Streaming接收实时输入数据流并将数据分成批,然后由Spark引擎处理,进而批量生成最终结果流。
Spark Streaming提供了一个高层次的抽象,称为DStream(离散流),它代表连续数据流。DStream可以通过Kafka,Flume等来源的输入数据流创建,也可以通过在其他DStream上应用高级函数来创建。在内部,一个DStream被表示为一系列RDD。
本指南将向你介绍如何使用DStream编写Spark Streaming程序。
一个快速例子
假如我们想统计从监听TCP套接字的数据服务器接收到的文本数据中的字数。
java代码示例:
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir", "D:/Users/KOUSHENGRUI976/winutils-master/winutils-master/hadoop-2.6.4");
SparkConf conf = new SparkConf().setAppName("heihei").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("127.0.0.1", 9999);
JavaDStream<String> words = lines.flatMap(p -> Arrays.asList(p.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(p -> new Tuple2<>(p, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
首先,我们创建一个JavaStreamingContext对象,这是所有Spark Streaming程序的入口。使用这个StreamingContext对象创建一个DStream来表示来自TCP源的流数据,指定主机(比如127.0.0.1)和端口(比如9999)。以上lines表示从数据服务器接收的数据流,流中每条记录都是一行文本。之后,我们把lines用空格分割成words。flatMap是一个转换操作,它通过从源DStream中的每条记录生成多个新记录来创建一个新的DStream。在这种情况下,每一行被分成多个单词,单词流被表示为words DStream。flatMap()方法的参数是一个FlatMapFunction对象(接收一个入参,返回一个Iterator对象)。接下来,我们要统计这些单词:words DStream通过mapToPair()方法,进一步映射成pair DStream,然后调用reduceByKey()方法,求每批数据中每个单词的出现频率。注意,执行以上行时,Spark Streaming只会设置它在启动后执行的计算,并未实际处理。最终我们调用StreamingContext 的start()方法,在所有转换完成之后开始处理。完整的代码可以参阅JavaNetworkWordCount。
如果你已经下载并且构建了Spark,你可以以下面方式运行这个例子。你首先需要运行Netcat(如果提示nc命令找不到的话,就yum install nc 安装即可),来作为数据服务器:
$ nc -lk
然后,在一个不同的终端,你可以通过如下命令启动这个例子:
$ ./bin/run-example streaming.JavaNetworkWordCount localhost
run-example文件在SPAKR_HOME/bin目录中。
观察现象http://www.jianshu.com/p/59733597d448
基本概念
引入jar包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.1</version>
</dependency>
如果数据来源是Kafka或者Flume的话,还需要引入整合jar包。Kafka整合jar包版本在Spark Streaming + Kafka Integration Guide有讨论:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.2.1</version>
</dependency>
初始化StreamingContext
开发Spark Streaming应用时,StreamingContext对象(java语言中是JavaStreamingContext对象)是所有Spark Streaming程序的入口。JavaStreamingContext对象可以通过SparkConf对象创建。这会在内部创建一个JavaSparkContext对象,可以通过jssc.sparkContext()来获取。批处理间隔必须根据应用程序和可用集群资源的延迟要求来设置。
JavaStreamingContext对象也可通过现有的JavaSparkConext对象来创建:
import org.apache.spark.streaming.api.java.*; JavaSparkContext sc = ... //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
定义JavaStreamingContext对象后,你必须执行以下操作:
1.通过创建input DStream来定义输入源。
2.通过对DStream应用转换和输出操作定义流式计算。
3.调用JavaStreamingContext对象的start()方法开始接收数据并处理。
4.调用JavaStreamingContext对象的awaitTermination()方法来等待处理停止(人为结束或者因为错误结束)。
5.可以通过调用JavaStreamingContext对象的stop()方法来人为停止处理。
需要注意的几点:
1.一旦一个JavaStreamingContext对象启动(调用start()方法)后,就不能建立或者添加新的流式计算了。
2.一旦一个JavaStreamingContext对象停止后,就不能再重新启动了。
3.在一个JVM中,最多只有一个StreamingContext可以处于活跃状态。
4.stop()方法会在停止JavaStreamingConext的同时停止SparkContext,如果只想停止JavaStreamingContext,则需要调用有参的stop()方法,参数值为false。
5.可以使用一个SparkContext对象来创建多个JavaStreamingContext对象,只要先前的JavaStreamingContext对象在下一个JavaStreamingContext对象被创建之前停止(不停止SparkContext)就可以。
离散流(DStream)
离散流(DStream)是Spark Streaming中提供的基本抽象。它代表了一个持续的数据流,或者是从源接收的输入数据流,或者是通过转换输入流而生成的处理过的数据流。在内部,DStream由连续的RDD表示,每个RDD都包含一定时间间隔的数据,如下图所示:
在DStream上应用的任何操作都会转换为对RDD的操作。例如,在前面将lines DStream转换成words DStream的例子中,flatMap操作是应用到lines DStream中的每一个RDD的,进而生成words DStream中的RDD。如下图所示:
RDD的这些转换由Spark引擎处理。DStream操作隐藏了大部分细节,并为开发人员提供了一个更高级别的API。这些操作将在后面的章节中详细讨论。
输入DStream和接收器
输入DStream代表从流数据源接收的DStream。在上面例子中,lines就是一个输入DStream,它代表了从netcat服务器接收到的流数据。每一个输入DStream(除了文件流)都与一个接收器(Receiver)对象相关联,该对象从一个源接收数据并将数据存储在Spark的内存中以供处理。
Spark Streaming提供了两类内置的流数据源:
1.基本数据源:比如文件系统、socket连接。
2.高级数据源:像Kafka,Flume等其他中间件,客户端编程时需要引入额外的jar依赖。
如果希望在流处理程序中同时接受多个数据流,则需要创建多个输入DStream。这将创建多个接收器,同时接受多个数据流。
需要注意的是,每个接收器都会占用一个核(或者是线程,如果是以本地模式运行的话)。因此,当在本地运行Spark Streaming程序时,不要使用"local"或者"local[1]"作为master URL,这两个都意味着只有一个线程运行任务。如果你使用基于接收器的输入DStream,则单线程将用于运行接收器,而不会处理接收到的数据。同样的,以集群模式运行时,分配给Spark Streaming程序的核心数也必须大于接收器数量,否则也只是接收数据而不做处理。
基本数据源
在上面例子中,我们已经看了jssc.socketTextStream("127.0.0.1", 9999),它通过TCP socket接收文本数据并创建一个输入DStream。除了socket外,StreamingContext API还提供了从文件创建输入DStream的方法。
文件流:为了从与HDFS API兼容的任何文件系统上(如HDFS,S3,NFS等)的文件中读取数据,可以通过以下方式创建输入DStream:
jssc.fileStream(String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass);
Spark Streaming将监视directory目录,并处理在该目录中创建的所有文件(不包括子目录中的文件)。注意:
1.这些文件必须具有相同的数据格式。
2.这些文件必须通过通过原子移动或者重命名来创建。
3.一旦移动,文件不得更改。所以即使文件被连续追加,新的数据将不会被读取。
对于简单的文本文件,有一个更简单的方法:
jssc.textFileStream(String directory);
文件流不需要运行接收器,所以不需要分配内核。
高级数据源
这类数据源需要整合其他第三方中间件,比如Kafka和Flume。在编程方面,需要引入一些与第三方中间件整合使用的依赖jar,从这些源创建DStream的功能被移到了jar包中。
其中一些高级数据源如下:
Kafka:Spark Streaming2.2.1与Kafka broker0.8.2.1及更高版本兼容。有关更多详细信息,请参阅Spark Streaming + Kafka Integration Guide。
Flume:Spark Streaming2.2.1与Flume1.6.0及更高版本兼容。有关更多详细信息,请参阅Spark Streaming + Flume Integration Guide。
接收器可靠性
数据源根据其可靠性可以分为两类。像Kafka和Flume这种数据源,允许传输的数据被确认。如果从这些可靠的数据源接收数据的系统正确地确认接收到的数据,则可以确保没有数据由于任何故障而丢失。这导致两种接收器:
1.可靠的接收器,一个可靠的接收器在收到数据并将数据存储在Spark中时,可以正确地向可靠的数据源发送确认。
2.不可靠的接收器,不可靠的接收器不会向数据源发送确认。这可以用于不支持确认的数据源,或者不希望或者不需要确认的数据源。
DStream上的转换操作
Similar to that of RDDs, transformations allow the data from the input DStream to be modified. DStreams support many of the transformations available on normal Spark RDD’s. Some of the common ones are as follows:
map(Function func):
flatMap(FlatMapFunction func)
filter(Function<T, Boolean> func)
repartition(int numPartitions)
union(JavaDStream otherDStream) union(JavaPairDStream otherPairDStream)
count() 得到一个JavaDStream<Long>实例
reduce(Function2 func)
countByValue()
reduceByKey(Function2 func) 仅可由JavaPairDStream实例调用,返回一个新的JavaPairDStream实例
join(JavaPairDStream otherPairDStream) 仅可由JavaPairDStream实例调用,返回一个新的JavaPairDStream实例
cogroup(JavaPairDStream otherPairDStream) 仅可由JavaPairDStream实例调用,返回一个新的JavaPairDStream实例
transform(Function func):在源DStream中的每个RDD上应用任意的RDD-RDD函数,返回一个新的DStream。
updateStateByKey(Function2 updateFunc) 仅可由JavaPairDStream实例调用,返回一个新的JavaPairDStream实例
下面针对一些转换操作做更详细的解释。
UpdateStateByKey操作
The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information。使用这个转换操作需要两个步骤:
1.定义状态。状态可以是任意的数据类型。
2.定义状态更新函数。用函数指定如何使用之前的状态和来自输入流的新值来更新状态
在每一批数据中,Spark会对所有现有的key应用状态更新函数,不管批数据是否有新数据。如果状态更新函数返回none,则键值对将被消除。
我们来举个例子说明一下。假设你想保持在文本数据流中出现的每个单词出现的次数。在这里,出现的次数是状态,它是一个整数。我们将更新函数定义为:
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
Integer newSum = ... // add the new values with the previous running count to get the new count
return Optional.of(newSum);
};
这适用于包含单词的DStream,例如,上面例子中的包含(word, 1)对的pairs DStream。
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
上例中的Optional类不是jdk8自带的那个,而是org.apache.spark.api.java.Optional,在spark-core.jar中。Function2是个函数式接口,全类名是org.apache.spark.api.java.function.Function2,也在spark-core.jar中,类似的函数式接口还有Function、Function0、Function3、Function4。
The updateFunction will be called for each word, with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count。需要注意的是,使用updateStateByKey必须配置checkpoint目录,将在下面的checkpointing章节详细讨论。完整的Java代码,请参阅JavaStatefulNetworkWordCount.java
Transform操作
transform操作及其变形(如transformWith、transformToPair、transformWithToPair),允许任意RDD-RDD函数应用于DStream。它可以用于应用那些尚未在DStream API中公开的RDD操作。例如,将数据流中的每个批次与其他数据集连接起来的功能没有直接暴露在DStream API中。但是,你可以使用transform操作完成这个功能。实际应用场景:通过将输入数据流与预先计算的垃圾信息进行实时数据清理,然后基于此进行过滤。
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...); JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
});
Note that the supplied function gets called in every batch interval。这允许你执行随时间变化的RDD操作,即可以在批次之间更改RDD操作,更改分区数量或者广播变量等等。
窗口操作
Spark Streaming还提供了窗口化的计算,允许你在滑动的数据窗口上应用转换操作。下图阐释了滑动窗口:
如上图所示,每当窗口滑过源DStream时,窗口内的源RDD被组合并操作以产生窗口DStream的RDD。上图中,操作被应用在最后3个时间单位的数据上,并且滑动2个时间单位。这表明任何窗口操作都需要指定两个参数:
1.窗口长度。窗口的持续时间(图中是3个时间单位)
2.滑动间隔。执行窗口操作的间隔(图中是2个时间单位)
这两个参数必须是源DStream的批次间隔的整数倍。
下面以一个例子来说明窗口操作。我们扩展下前面的例子,现在要统计前30s内出现的单词的个数,每10s统计一次。To do this, we have to apply the reduceByKey operation on the pairs DStream of (word, 1) pairs over the last 30 seconds of data. This is done using the operation reduceByKeyAndWindow。
// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
reduceByKeyAndWindow(Function2 reduceFunc, Duration windowDuration, Duration slideDuration),返回一个JavaPairDStream实例。
一些常用的窗口操作如下,它们都需要以上所说的两个参数:
window(Duration windowDuration, Duration slideDuration)
countByWindow(Duration windowDuration, Duration slideDuration)
countByValueAndWindow(Duration windowDuration, Duration slideDuration)
reduceByWindow(Function2 reduceFunc, Duration windowDuration, Duration slideDuration)
reduceByKeyAndWindow(Function2 reduceFunc, [Function2 invReduceFunc,] Duration windowDuration, Duration slideDuration)
Join操作
最后,值得强调的是,可以轻松地在Spark Streaming中执行不同类型的连接。
Stream-Stream joins(仅JavaPairDStream有join相关方法,JavaDStream没有)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
在这里,在每个批间隔中,由stream1生成的RDD将与由stream2生成的RDD连接。除了join(),还有leftOuterJoin()、rightOuterJoin()。
此外,在windowed stream上进行连接也是很常用的:
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
Stream-dataset joins
这在DStream的transform操作章节已经讲过了。现在我们用一个windowed stream连接一个dataset:
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
这里解释下,DStream与RDD连接不是DStream.join(RDD),DStream与RDD维度不一样,DSteam是一连串的RDD,所以DSteam与RDD连接,其实指的是DStream中的RDD与RDD连接。
DStream上的输出操作
输出操作允许将DStream的数据推送到外部系统,如数据库或者文件系统。像RDD的行为操作和转换操作的关系一样,DStream输出操作会触发转换操作的实际执行。
有以下常见输出操作:
print():在driver node上打印DStream每一批数据的前10个元素,仅用于开发和调试。
saveAsHadoopFiles(String prefix, String suffix)
saveAsNewAPIHadoopFiles(String prefix, String suffix)
foreachRDD(VoidFunction func)、foreachRDD(VoidFunction2 func):对DStream/PairDStream的每个RDD都应用func函数。
正确使用foreachRDD
foreachRDD()功能很强大,允许将数据发送到外部系统。但是,正确且高效使用该方法却不简单,一些常见的错误如下:
通常,将数据写入外部系统需要创建连接对象,并使用它来将数据发送到远程系统。为此,开发人员可能会在Spark driver中创建连接对象,然后在Spark worker上使用它来将RDD的数据发送出去。如下代码:
dstream.foreachRDD(rdd -> {
Connection connection = createNewConnection(); // executed at the driver
rdd.foreach(record -> {
connection.send(record); // executed at the worker
});
});
这是不正确的,因为这需要将连接对象序列化并从driver发送到worker。这种连接对象是很难跨机器传输的,这种错误可能表现为序列化错误(连接对象不可序列化)或初始化错误(连接对象需要在worker初始化)等。正确做法是在worker创建连接对象。然而,这将会导致另一个常见错误,即为每一个记录创建一个连接对象。代码如下:
dstream.foreachRDD(rdd -> {
rdd.foreach(record -> {
Connection connection = createNewConnection();
connection.send(record);
connection.close();
});
});
通常情况下,创建一个连接对象需要时间和资源的开销。所以,为每条记录创建和销毁连接对象可能会产生不必要的高开销,并且会显著降低系统的吞吐量。
更好的解决方案是使用rdd.foreachPartition(),为每个分区创建一个连接对象,并使用该连接发送分区中的所有记录。代码如下:
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
});
});
最后,通过跨批次重用连接对象,可以进一步优化。我们可以维护一个静态的连接对象池,里面的连接对象可以被重用。代码如下:
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
// ConnectionPool is a static, lazily initialized pool of connections
Connection connection = ConnectionPool.getConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
ConnectionPool.returnConnection(connection); // return to the pool for future reuse
});
});
至此,就是将数据发送到外部系统的最有效的解决方案。我们再回顾一下这个方案,首先对DStream执行foreachRDD()方法,去操作每个RDD的数据,然后,对每个RDD执行foreachPartition()方法,去操作每个RDD的每一个分区的数据。此外,操作数据的连接要从连接池中拿,并且是懒初始化和有过期时间的。
其他需要注意的几点:
1.如果应用程序没有对DStream执行输出操作的话,那么根本不会执行任何操作。系统只会简单地接收数据并丢弃它。
2.默认情况下,输出操作是一次一个执行的,而且他们是按照在应用程序中定义的顺序执行的。
DataFrame和SQL操作
可以轻松使用DataFrame和SQL操作流数据。你必须使用StreamingContext正在使用的SparkContext来创建一个SparkSession对象。此外,必须这样做才能在驱动程序故障时重启。这是通过懒加载创建一个SparkSession单例实现的。下面的例子展示了这一点,它修改了之前的单词计数示例,使用DataFrame和SQL生成字数。每个RDD都转换为一个DataFrame,注册为一个临时表,然后使用SQL进行查询。
/** Java Bean class */
public class JavaRow implements java.io.Serializable {
private String word;
public String getWord() {
return word;
}
public JavaRow(String word) {
this.word = word;
}
} /** DataFrame operations inside your streaming program */
JavaDStream<String> words = ...
words.foreachRDD((rdd, time) -> {
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
JavaRDD<JavaRow> rowRDD = rdd.map(word -> new JavaRow(word));
Dataset<Row> wordsDF = spark.createDataFrame(rowRDD, JavaRow.class);
wordsDF.createOrReplaceTempView("words");
Dataset<Row> wordCountsDF = spark.sql("select word, count(*) as total from words group by word");
wordCountsDF.show();
});
查看完整的源代码。JavaSqlNetworkWordCount.java
MLib操作
用的时候再学。
缓存/持久化 Caching/Persistence
同RDD类似,DStream也允许开发人员将流数据保存在内存中。也就是说,调用DStream的persist()方法会自动将该DStream中的每个RDD都保存在内存中。如果DStream中的数据被多次计算的话,这将非常有用。对于像reduceByWindow()和reduceByKeyAndWindow()这样的基于窗口的操作以及像updateStateByKey()这样的基于状态的操作,这是隐含的。因此,基于窗口的操作生成的DStream会自动持久化到内存中,开发人员无需显式调用persist()方法。
对于通过网络接收数据的输入流,例如Kafka,Flume,sockets等,默认持久化级别是将数据复制到两个节点以实现容错。
请注意,与RDD不同,DStream默认持久化级别使数据在内存中保持序列化。这在Performance Tuning进一步讨论。
检查点 Checkpointing
流应用必须全天候运行,所以必须对与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)具有恢复能力。为了做到这一点,Spark Streaming需要存储足够的信息到容错的存储系统,以便从故障中恢复。有两种类型的检查点数据:
1.元数据检查点。将定义流式计算的信息保存到HDFS等容错系统中。This is used to recover from failure of the node running the driver of the streaming application (discussed in detail later)。元数据包括:
①配置--用于创建流应用程序的配置
②DStream操作--定义流应用程序的操作集合
③incomplete batches--Batches whose jobs are queued but have not completed yet
2.数据检查点。将生成的RDD保存到可靠的存储系统中。这在将多个批次的数据组合在一起的有状态转换中是必需的。在这样的转换中,生成的RDD依赖于之前批次的RDD,导致依赖链的长度随着时间的推移不断的增加。为了避免恢复时间的这种无限增长(与依赖链成比例),有状态转换的中间RDD被周期性地保存到可靠存储系统中(例如HDFS),以切断依赖链。
何时启用检查点
必须为具有以下任何要求的应用程序启用检查点:
1.程序中有状态转换操作--如果在程序中使用了updateStateByKey()或者reduceByKeyAndWindow(with inverse function),那么就必须提供检查点目录以允许周期性地保存中间RDD。
2.从运行应用程序的驱动程序的故障中恢复--Metadata checkpoints are used to recover with progress information。
请注意,没有应用上述状态转换操作的简单的流应用程序可以不启用检查点。在这种情况下,从驱动程序故障中恢复也将是部分的(一些接收到但未处理的数据可能会丢失)。这通常是可以接受的,许多都以这种方式运行Spark Streaming应用程序。预计对非Hadoop环境的支持未来将得到改善。
如何配置检查点
检查点可以通过设置一个在容错的、可靠的文件系统(例如HDFS、S3)中的目录来启用,检查点信息将被保存到该文件系统中。这是通过调用StreamingContext实例的checkpoint(String directory)方法来实现的。这将允许你使用上述的状态转换操作。此外,如果你想让应用程序从驱动程序故障中恢复,则应该重写你的流应用程序以具有以下行为:
当程序第一次启动时,它会创建一个新的StreamingContext实例,设置所有的流,然后调用start()方法。
程序在失败后重启时,将会根据检查点目录中的检查点数据重新创建一个StreamingContext实例。
这种行为可以通过JavaStreamingContext.getOrCreate(String checkpointPath, Function0<JavaStreamingContext> creatingFunc)方法很简单地实现:
String checkpointDirectory = "hdfs://192.168.100.100:9000/checkpoint/application1";
Function0<JavaStreamingContext> creatingFunc = () -> {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingText2");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// set checkpoint directory
jssc.checkpoint(checkpointDirectory);
return jssc;
}; // Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, creatingFunc); // Do additional setup on context that needs to be done, irrespective of whether it is being started or restarted
// context. ... // Start the context
context.start();
try {
context.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
JavaStreamingContext.getOrCreate(String checkpointPath, Function0<JavaStreamingContext> creatingFunc)方法,如果checkpointPath存在,则将根据检查点数据重新创建JavaStreamingContext实例。如果checkpointPath不存在(即第一次运行),那么creatingFunc函数将被调用,来创建一个新的JavaStreamingContext实例并设置输入DStream。
除了使用getOrCreate()方法外,还需要确保驱动程序进程在失败时自动重新启动。这只能通过运行应用程序的部署基础结构来完成,在下面的部署应用章节有详细的讨论。
请注意,RDD的检查点导致了存储到可靠存储系统的成本。这可能会导致RDD检查点的批处理时间变长。因此,检查点需要小心设置。在批间隔很小的情况下(例如1s),检查点可能会显著降低吞吐量。而且,如果检查点过于频繁,则会导致谱系和任务规模增长,这可能有不利影响。
对于需要RDD检查点的有状态转换操作,默认的间隔是批间隔的整数倍。它可以通过调用DStream的checkpoint(Duration checkpointInterval)方法来设置。通常情况下,检查点间隔设置为滑动间隔(如果有的话)的5-10倍是一个很好的尝试。
累加器,广播变量和检查点
累加器和广播变量是不能从检查点恢复的。如果你启用了检查点,并使用了累加器和广播变量,那么你必须创建一个懒实例化的累加器和广播变量单例,以便在驱动程序重新启动时重新实例化这些实例。
见下面例子:
class JavaWordBlacklist { private static volatile Broadcast<List<String>> instance = null; public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
} class JavaDroppedWordsCounter { private static volatile LongAccumulator instance = null; public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
}
}
}
return instance;
}
} wordCounts.foreachRDD((rdd, time) -> {
// Get or register the blacklist Broadcast
Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(wordCount -> {
if (blacklist.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}
全部代码请参阅JavaRecoverableNetworkWordCount.
部署应用
本节讨论部署Spark Streaming应用程序的步骤。
要求
要运行一个Spark Streaming应用程序,你需要以下:
1.具有集群管理器的集群--这是任何Spark应用程序的一般要求。
2.打包应用程序jar--你必须将你的流应用程序编译成jar包。
3.为executor节点配置足够的内存--由于接收到的数据必须存储在内存中,executor节点必须配置足够的内存以保存接收到的数据。请注意,如果你正在进行10分钟的窗口操作,则系统必须在内存中保留至少10分钟的数据。所以应用程序需要的内存取决于在其中使用的操作。
4.配置检查点 - 如果流应用程序需要配置检查点,那么必须将Hadoop API兼容的容错存储系统(HDFS)中的一个目录设置为检查点目录,使得流应用能以检查点信息来进行故障恢复。
5.配置驱动程序的自动重启 - 为了能自动从驱动程序故障中恢复,运行流应用程序的部署基础架构必须监视驱动程序进程,并在驱动程序失败时重新启动驱动程序。不同的集群管理器有不同的工具来实现:
①Spark Standalone - Spark驱动应用程序可以提交到Spark Standalone集群中运行。也就是说,驱动应用程序本身在其中一个工作节点上运行。此外,Standalone集群管理器可以被指示监督驱动应用程序,并在驱动程序由于非零退出代码或者由于所在的节点故障而失败时重新启动它。查看Spark Standalone来获取更多详情。
②YARN - YARN支持类似的机制来重启应用程序。
6.配置预写入日志 - Spark可以启用预写入日志来实现强大的容错保证。启用之后,从接收器接收到的所有数据将被写入到检查点目录中的预写入日志中。这可以避免驱动程序恢复时数据丢失,从而确保数据零丢失(在下面容错语义部分详细讨论)。启用的方式是设置配置参数spark.streaming.receiver.writeAheadLog.enable为true。然而,这些更强语义可能是以单个接收器的接收吞吐量为代价的。这可以通过并行运行更多的接收器来增加总吞吐量来纠正。此外,建议在启用预写入日志时,Spark接收数据不要存储副本,因为相当于一个副本的日志已存储在容错存储系统中。这可以通过将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER来实现。查看Spark Streaming Configuration获取更多详情。请注意,启用I/O加密时,Spark不会加密写入预写入日志的数据。如果需要对预写入日志的数据进行加密,则应将其存储在原生支持加密的文件系统中。
7,.设置最大接收速率 - 如果集群资源不足以使流式应用程序处理数据的速度与接收速度一样快,从1.5开始,Spark提供了一个叫做backpressure的特性,启用此特性后,Spark Streaming会自动调整速率限制。启用方式是设置配置参数spark.streaming.backpressure.enabled为true。
升级应用程序代码
如果正在运行的Spark Streaming应用程序需要使用新的应用程序代码升级,则有两种机制可供选择:
1.新的Spark Streaming应用程序启动并与旧应用程序并行运行。一旦新应用程序(接收的数据与旧应用程序相同)被预热好,旧应用程序就可以停止。请注意,在数据源支持发送数据到两个目的地的情况下,这种机制才可以使用。
2.旧应用程序平滑关闭(调用JavaStreamingContext的stop()方法),以确保已接收到的数据在关闭完被处理。然后启用新应用程序,该应用程序从旧应用程序停止的同一点开始处理。请注意,只有使用支持源端缓冲的数据源(如Kafka和Flume)才能完成此操作,因为数据需要在旧应用程序关闭、新应用程序启动前进行缓冲。同时,还需要使用不同的检查点目录来启动新应用程序,或者删除之前的检查点目录。因为检查点信息本质上包含序列化的Scala/Java/Python对象,试图用新应用程序的类来反序列化这些对象可能会出错。
监控应用程序
除了Spark本身的监控功能外,Spark Streaming还提供了一些其他特定功能。当使用StreamingContext时,Spark Web UI会显示一个额外的Streaming选项卡,它显示有关正在运行的接收器(接收器是否处于活动状态,接收到的记录数量,接收器错误等)和已完成的批次(批处理时间,排队延迟等),这可用来监控流应用程序的进度。
Web UI中的以下两个指标尤为重要:
1.处理时间 - 处理每批数据的时间。
2.Scheduling Delay - 批次在队列中等待处理先前批次的时间(the time a batch waits in a queue for the processing of previous batches to finish)。
如果批处理时间一直比批处理间隔大,或者scheduling delay持续增大,则表示系统处理数据的速度比不上接收数据的速度。在这种情况下,考虑如何减少批处理时间。
Spark Streaming程序的进度也可以使用StreamingListener接口进行监控,该接口允许你获取接收器状态和处理时间等信息。
性能调整
使集群上的Spark Streaming应用程序表现出最佳的性能需要进行一些调整。本节介绍可调整的参数和配置,以提高应用程序的性能。从较高层次考虑,你需要考虑两件事情:
1.通过有效利用集群资源减少批处理时间。
2.设置正确的批次大小,以使得批处理的速度能跟得上接收数据的数据。
减少批处理时间
Spark有很多优化可以使每个批次的处理时间最少。这些在Tuning Guide有详细讨论。本节介绍一些最重要的内容:
数据接收的并行度
通过网络接收数据(如Kafka,Flume,socket等)需要将数据反序列化并存储到Spark中。如果数据接收成为系统中的瓶颈,则可以考虑并行化数据接收。请注意,每个输入DStream都会创建一个接收器。因此可以通过创建多个输入DStream并配置它们接收流数据源的不同分区。例如,单个接收两个主题数据的输入DStream可以被切分为两个输入DStream,每个输入DStream接收一个主题的数据。这将创建两个接收器,并行接收数据,从而提高整体吞吐量。多个DStream可以通过union操作连接成一个DStream。如下;
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
另一个应该考虑的参数是接收器的块间隔,它由配置参数spark.streaming.blockInterval决定。对于大多数接收器来说,接收到的数据在存储在Spark的内存之前会被合并成块。每个批次的块数决定了将用于处理数据的任务数量。块数=batch interval / block interval。例如,批间隔设置为2s,块间隔设置为200ms,那么会创建10个任务。如果任务数太少(少于每台机器的内核数),那么效率将会很低,因为不是所有的内核都用于处理数据。要增加任务数量,就需要减少块间隔。但是,建议的块间隔最少为50ms,低于此值,任务启动开销可能会成为问题。
用多个输入DStream、多个接收器去接收数据的另一种方法是显示重新分配输入DStream的分区(调用inputStream.repartition(int numPartitions))。
数据处理的并行度
如果在计算的某些阶段使用的并行任务数量不够高,则集群资源可能未被充分利用。例如,对于像reduceByKey和reduceByKeyAndWindow这样的分布式reduce操作,并行任务数量的默认值由spark.default.parallelism配置属性控制。你可以将并行级别当作一个参数传递,或者设置上述属性以更改默认值。
数据序列化
数据序列化的开销可以通过调整序列化格式来减少。在Spark Streaming中,有两种类型的数据被序列化;
1.输入数据。默认情况下,通过接收器接收到的输入数据会以StorageLevel.MEMORY_AND_DISK_SER_2级别存储在executor的内存中。useDisk,useMemory,not useOffHeap,not deserialized,2 replication。也就是说,将数据序列化为字节以减少GC开销,并有2个副本用于容错。此外,数据首先保存在内存中,并且只有当内存不足以保存流式计算所需的所有输入数据时才会溢出到磁盘。这个序列化显然有CPU开销 - 接收器必须反序列化接收的数据,并使用Spark的序列化格式重新序列化它。
2.持久化通过流操作生成的RDD。由流计算生成的RDD可能会持久化到内存中。例如,窗口操作会将数据存储到内存中,因为它们将被多次处理。但是,与Spark Core默认的StorageLevel.MEMORY_ONLY级别不同,流式计算生成的RDD持久化级别是StorageLevel.MEMORY_ONLY_SER,以最大限度地减少GC开销。
在以上两种情况下,使用Kryo序列化既可以减少CPU又可以减少内存开销。有关更多详细信息,查看Spark Tuning Guide。对于Kryo,请考虑注册自定义类,并禁用对象引用跟踪。查看Spark Configuration。
如果需要为流应用程序保存的数据量不大的话,可以将数据保存为反序列化对象而不会导致过度的GC开销。例如,如果使用几秒钟的批间隔并且没有窗口操作,那么你可以通过显式设置存储级别来禁用持久化数据的序列化。这可以减少由于序列化造成的CPU开销,并可能在不增加太多GC开销的情况下提高性能。综上,序列化会造成很较大的CPU开销,但是减少GC开销。
任务启动开销
如果每秒启动的任务很多(比如说50或者更多),那么向slaves发送任务的开销就会很大,这使得很难达到亚秒级的延迟。可以通过以下更改来减少开销:
Execution mode:以Spark Standalone模式运行Spark或者以粗粒度的Mesos模式运行Spark会比以细粒度的Mesos模式运行Spark有更好的任务启动时间。查看Running Spark on Mesos获取更多详情。
这种更改可能会使批处理时间减少100ms,从而允许亚秒批大小可行。亚秒:没有达到秒,也就是说不到一秒。
设置合适的批间隔
为了让Spark Streaming应用程序稳定地运行在集群上,系统应该有能力像接收数据一样快速处理数据。换句话说,数据处理应该像数据生成一样快。通过Web UI,我们可以看到批处理时间是否比批间隔小,进而判断程序是否稳定。
为你的应用程序找出合适的批大小的一个好方法是用一个保守的批间隔(比方说,5-10s)和较低的数据速率进行测试。要验证系统是否能够跟上数据速率,可以检查每个处理过的批次所经历的端到端延迟的值(可以在Spark驱动程序log4j日志中查找"Total delay",或者使用StremingListener接口)。如果延迟保持与批大小相当,那么系统是稳定的。否则,如果延迟不断增加,则意味着系统无法跟上,不稳定。一旦你有了一个稳定的配置,你可以尝试提高数据速率或者减小批大小。注意,只要延迟小于批大小,由于数据速率增加而引起的瞬时延迟增加是正常的。
内存调整
在Tune Spark中详细讨论了调整Spark应用程序的内存使用情况和GC行为,必须阅读。在本节中,我们将专门讨论Spark Streaming应用程序中的一些调优参数。
Spark Streaming应用程序需要的内存大小很大程度上取决于所使用的转换操作。例如,如果你想在最后10分钟的数据上使用窗口操作,那么你的集群应该有足够的内存来存储10分钟的数据。或者你想在有很多key的情况下使用updateStateByKey操作,那么就需要很多内存。相反,如果你想做一个简单的map-filter-store操作,那么就不需要很多内存。
通常,因为通过接收器接收的数据是以StorageLevel.MEMORY_AND_DISK_SER_2级别存储的,内存中放不下的数据会溢出到磁盘,这可能会降低流应用程序的性能,因此建议根据流应用程序的需要提供足够的内存。最好尝试一下小规模的内存使用情况并做相应的评估。
内存调整的另一个方面是GC(垃圾回收)。对于需要低延迟的流应用程序,不希望因为JVM垃圾回收而造成大量暂停。有几个参数可以帮你调整内存使用情况和GC开销:
1.DStream的持久化级别:如在上面数据序列化部分所说,输入数据和RDD默认以序列化的字节被持久化。与反序列化的持久化相比,这同时减少了内存使用和GC开销。启用Kryo序列化进一步减少了序列化的大小和内存使用。进一步减少内存使用还可以通过压缩来实现(查看Spark的配置参数spark.rdd.compress),代价是cpu时间。
2.清理旧数据:默认情况下,所有的输入数据和持久化的RDD是自动清理的。Spark Streaming根据使用的转换操作来决定何时清理数据。例如,你正在使用10分钟的窗口操作,则Spark Streaming将保留最近10分钟的数据,并主动丢弃旧数据。通过设置streamingcontext.remember,数据可以保留更长的时间。
3.CMS垃圾回收器:强烈建议使用CMS(concurrent mark-and sweep,并发标记扫描)GC,以使GC相关的暂停持续低水平。尽管并发GC会降低系统的整体吞吐量,但仍推荐使用并行GC来实现更加一致的批处理时间。确保在驱动程序上(在spark-submit中使用 --driver-java-options )和executor上(使用Spark配置参数spark.executor.extraJavaOptions)设置了CMS GC。
4.其他技巧:为了进一步减少GC开销,这里有更多的技巧值得尝试。
①用StorageLevel.OFF_HEAP存储级别持久化RDD。(useDisk,useMemory,useOffHeap,not deserialized,1 replication)
②使用更多的executor和较小的堆大小。这会降低每个JVM 堆内的GC压力。
需要记住的几点:
1.每个DStream都与一个接收器相关联(除了文件流)。为了并行读取数据,需要创建多个接收器,即多个DStream。接收器在executor上运行,它占据一个内核。确保除去接收器所占内核之后,还有足够的内核来处理数据。spark.cores.max应该把接收器的内核也算在内。接收器以循环的方式分配给executor。
2.当从数据源接收数据时,接收器创建数据块。每隔块间隔(blockInterval)都会生成一个新的数据块。在批间隔中会生成N个数据块,N=批间隔/块间隔。这些块由当前executor的块管理器分发给其他executor的块管理器。之后,驱动程序上运行的的网络输入跟踪器(Network Input Tracker)将被通知块的位置以供进一步处理。
3.An RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally.
4.The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in. Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait
increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally.
5.Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling inputDstream.repartition(n)
. This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. An RDD’s processing is scheduled by driver’s jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued.
6.If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted.
容错语义
在本节中,我们将讨论Spark Streaming应用程序在发生故障时的行为。
背景:
为了理解Spark Streaming提供的语义,让我们记住Spark RDD的基本容错语义:
1.RDD是一个不可变的,可以重新计算的,分布式的数据集。Each RDD remembers the lineage of deterministic operations that were used on a fault-tolerant input dataset to create it.
2.如果RDD的任何分区由于工作节点故障而丢失,那么这个分区可以由原始数据集通过操作谱系重新计算出来。
3.假定RDD的所有转换操作都是确定的,最终转换的RDD中的数据总是相同的,而不管Spark集群中的故障如何。
Spark操作容错文件系统(比如说HDFS)中的数据。所以,所有由容错数据生成的RDD也应该是容错的。然而,对于Spark Streaming,情况并非如此,因为大多数情况下是通过网络接收的数据(除了文件流)。
要为所有生成的RDD实现相同的容错属性,需要为接收的数据在集群工作节点的多个executor上创建副本(默认是2个副本)。这导致系统中有两种数据在发生故障时需要恢复:
1.接收到以及创建了副本的数据。这些数据在单个工作节点故障时仍然存在,因为它在另一个工作节点上有副本。
2.Data received but buffered for replication。因为这些数据没有副本,恢复这些数据唯一的方法是从数据源再接收一次。
此外,有两种失败我们应该关注:
1.工作节点失败。运行executor的任意一个工作节点都可能会失败,此时这些节点上所有的内存中的数据都会丢失。如果在失败节点上有接收器运行,那么它们缓冲的数据将会丢失。
2.驱动程序节点失败。如果运行Spark Streaming应用的驱动程序节点失败,那么显然SparkContext会丢失,所有executor及其内存中的数据都会丢失。
定义
关于每个记录可能会被系统处理多少次,有三种可能:
1.At most once。每个记录将被处理一次或者根本不处理。
2.At least once。每个记录将被处理一到多次。这比最多一次要强一些,因为它确保不会丢失任何数据,但是可能有重复。
3.Exactly once。每个记录将被处理一次,没有数据丢失,也没有数据被重复处理。这是三个最好的。
基本语义
在任何流处理系统中,广义来讲,处理数据有三个步骤。
1.接收数据:数据通过接收器或其他方式从源接收数据。
2.转换数据:使用DStream和RDD的转换操作来转换接收到的数据。
3.输出数据:最终转换的数据被输出到外部系统,比如文件系统,数据库或者仪表盘等。
如果一个流应用程序必须实现端到端的exactly-once保证,那么每个步骤都必须提供exactly-once保证。也就是说,每个记录只能被接收一次,只能被转换一次,并被输出到下游系统一次。让我们在Spark Streaming的上下文中理解这些步骤的语义。
1.接收数据:不同的输入源提供不同的保证。在下面会讨论。
2.转换数据:由于RDD提供的保证,所有接收的数据都会被正好处理一次。即使有故障,只要接收到的输入数据是可访问的,最终转换的RDD将总是具有相同的内容。
3.输出数据:输出操作默认保证至少一次,因为它依赖于输出操作的类型(是否幂等性idempotent)以及下游系统(是否支持事务)。但是用户可以实现自己的事务机制来实现exactly-once。在下面会讨论。
接收数据的语义
不同的输入源提供不同的保证,从最少一次到刚好一次。
文件流
如果所有输入数据都在容错系统中,如HDFS,那么Spark Streaming可以从任意故障中恢复并处理所有数据。这就是刚好一次,意味着所有的数据都会被刚好处理一次,不管什么失败。
基于接收器的源
对于基于接收器的输入源,容错语义取决于故障情况和接收器的类型。正如我们之前讨论过的,有两种类型的接收器;
1.可靠接收器。这些接收器仅在接收到的数据创建完副本后才去确认通知可靠的数据源。如果这样的接收器失败了,那么数据源不会收到缓冲(未复制)数据的确认通知,所以,如果接收器重启的话,数据源将重新发送数据,并且没有数据会由于失败而丢失。
2.不可靠接收器。这些接收器不会发送确认通知,所以在工作节点或者驱动程序故障时可能会丢失数据。
如果一个工作节点失败,可靠的接收器不会丢失数据。对于不可靠的接收器,接收但是没有创建副本的数据会丢失。如果驱动程序故障,那么不管是可靠接收器还是不可靠接收器,除了上面这些丢失外,所有在内存中接收和复制的数据都会丢失,这会影响有状态转换操作的结果。 If the driver node fails, then besides these losses, all of the past data that was received and replicated in memory will be lost. This will affect the results of the stateful transformations.???
为了避免之前接收到的数据丢失,从1.2版本开始,Spark引入了预写入日志,将接收到的数据保存到容错存储系统中。预写入日志的启用和可靠的接收器可以确保数据零丢失(即使是驱动程序故障情况下)。就语义而言,它提供了至少一次的保证。
Kafka直接API
从1.3版本开始,我们引入了一个新的Kafka Direct API,它可以确保所有的Kafka数据只被Spark Streaming接收一次。除此之外,如果你执行的是exactly-once的输入操作,那么可以实现端到端的exactly-once。参阅Kafka Integration Guide查看详情。
输出操作的语义
输出操作(比如foreachRDD)有至少一次的语义。也就是说,转换后的数据可能在工作节点失败的情况下不止一次地被写入外部实体。虽然这对于使用saveAs*HadoopFiles操作保存到文件系统是可以接受的(因为文件将被相同的数据简单地覆盖),但是可要额外的努力来实现exactly-once。有两种方法:
1.幂等更新(Idempotent updates):多次尝试总是写相同的数据。例如,saveAs*HadoopFiles总是将相同的数据写到生成的文件中。
2.事务更新(Transctional updates):所有的更新都以事务方式进行,因此更新只能以原子方式进行一次。下面是一个具体实现方法:
①使用批次时间(在foreachRDD中可用)和RDD的分区索引创建一个标识符,该标识符唯一标识流应用程序中的blob data。
②使用这个标识符事务性地更新外部系统。Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. 也就是说,如果这个标识符还没提交,就以原子方式提交分区数据和这个标识符。否则,如果该标识符已经提交的话,就跳过更新。
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}