spark 读写text,csv,json,parquet

以下代码演示的是spark读取 text,csv,json,parquet格式的file 为dataframe,

将dataframe保存为对应格式的文件

package com.jason.spark23

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession} object ReadTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("readtest")
.master("local")
.getOrCreate()
val pathjson = "C:\\notos\\code\\sparktest\\src\\main\\resources\\employees.json"
val pathavsc = "C:\\notos\\code\\sparktest\\src\\main\\resources\\full_user.avsc"
val pathtxt = "C:\\notos\\code\\sparktest\\src\\main\\resources\\people.txt"
val pathcsv = "C:\\notos\\code\\sparktest\\src\\main\\resources\\people.csv"
val pathparquet = "C:\\notos\\code\\sparktest\\src\\main\\resources\\users.parquet"
val sc = spark.sparkContext
println(s"-----------------read--------------------------")
println("====txt df")
val txtrdd = sc.textFile(pathtxt).map(_.split(",")).map(arr => Row.fromSeq(arr))
val schemaString = "name age"
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val txtDf = spark.createDataFrame(txtrdd, schema)
txtDf.show() println("====json df") //jsondf 会自动给schema设置类型
val jsonDf = spark.read.json(pathjson)
jsonDf.show() println("==== csvdf")
//会根据值自动生成类型
val csvdf = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load(pathcsv)
csvdf.show() println("====parquet df")
val usersDF = spark.read.load(pathparquet)
usersDF.show() println("----------------------------------write-------------------------------")
val path = "C:\\notos\\code\\sparktest\\src\\main\\" println(s"====txt output")
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[String]
csvdf
.write
.format("csv")
.mode(SaveMode.Append)
.options(Map("compression" -> "bzip2", "sep" -> "\t", "header" -> "false"))
.save(path + "\\text") println(s"====csv output")
csvdf.write.mode(SaveMode.Ignore)
.format("csv")
.option("sep", "|")
.option("header", "true")
.save(s"$path\\csv") println(s"====json output")
csvdf.write.mode(SaveMode.Append)
.format("json")
.save(path + "\\json") println(s"====parquet output")
csvdf.write.mode(SaveMode.Append)
.format("parquet")
.save(s"$path\\parquet")
spark.stop()
}
}

上述将dataframe保存为text为也采取了csv格式,若要保存为text,dataframe中只能有一个String类型的字段,但是一般dataframe都不止一个字段,保存为text时也要指定字段分隔符,正好与csv的要求一致,而且csv格式的文件也可以用 sc.textFile 方法来读取

上一篇:Spring3系列10- Spring AOP——Pointcut,Advisor拦截指定方法


下一篇:scp 从远程服务器上一下载文件