Spark Structured Streaming:将数据落地按照数据字段进行分区方案

方案一(使用ForeachWriter Sink方式):

val query = wordCounts.writeStream.trigger(ProcessingTime(5.seconds))
.outputMode("complete")
.foreach(new ForeachWriter[Row] {
var fileWriter: FileWriter = _ override def process(value: Row): Unit = {
fileWriter.append(value.toSeq.mkString(","))
} override def close(errorOrNull: Throwable): Unit = {
fileWriter.close()
} override def open(partitionId: Long, version: Long): Boolean = {
FileUtils.forceMkdir(new File(s"/tmp/example/${partitionId}"))
fileWriter = new FileWriter(new File(s"/tmp/example/${partitionId}/temp"))
true
}
}).start()

方案二(ds.writeStream().partitionBy("field")):

import org.apache.spark.sql.streaming.ProcessingTime

val query =
streamingSelectDF
.writeStream
.format("parquet")
.option("path", "/mnt/sample/test-data")
.option("checkpointLocation", "/mnt/sample/check")
.partitionBy("zip", "day")
.trigger(ProcessingTime("25 seconds"))
.start()

java代码:

        // Write new data to Parquet files
// can be "orc", "json", "csv", etc.
String hdfsFileFormat = SparkHelper.getInstance().getLTEBaseSaveHdfsFileFormat();
String queryName = "save" + this.getTopicEncodeName(topicName) + "DataToHdfs";
String saveHdfsPath = SparkHelper.getInstance().getLTEBaseSaveHdfsPath();
// The file path which partitioned by scan_start_time (format:yyyyMMddHH0000)
dsParsed.writeStream()
.format(hdfsFileFormat)
.option("path", saveHdfsPath + topicName + "/")
.option("checkpointLocation", this.checkPointPath + queryName + "/")
.outputMode("append")
.partitionBy("scan_start_time")
.trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
.start();

更多方式,请参考《在Spark结构化流readStream、writeStream 输入输出,及过程ETL

上一篇:AvalonEdit-基于WPF的代码显示控件


下一篇:基于C#的波形显示控件的实现[转]