def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test") val session: SparkSession = SparkSession .builder() .config(conf) .getOrCreate() val sc: SparkContext = session.sparkContext sc.setLogLevel("ERROR") // dataFrame = 数据 + 元数据 //Spark 的Dataset 既可以按collection,类似于rdd的方法操作,也可以按SQL领域语言定义的方式操作数据 val dataLists: RDD[String] = sc.textFile("data/person.txt") //这里的RDD类型影响下面数据的类型,也可以返回DataSet val rddRow = dataLists .map(_.split(" ")).map(arr => Row.apply(arr(0), arr(1).toInt)) val fields = Array( StructField.apply("name", DataTypes.StringType, true), StructField.apply("age", DataTypes.IntegerType, true) val schema = StructType.apply(fields) val dataFrame = session.createDataFrame(rddRow, schema) dataFrame.show()
+--------+---+
| name|age|
+--------+---+
|zhangsan| 18|
| lisi| 22|
| wangwu| 99|
| xiaoke| 22|
+--------+---+
dataFrame.printSchema()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
dataFrame.createTempView("person") //设置表名 session.sql("select * from person where name = ‘xiaoke‘").show()
+------+---+
| name|age|
+------+---+
|xiaoke| 22|
+------+---+