1.准备数据employee.txt
1001,Gong Shaocheng,1 1002,Li Dachao,1 1003,Qiu Xin,1 1004,Cheng Jiangzhong,2 1005,Wo Binggang,3
将数据放入hdfs
[root@jfp3-1 spark-studio]# hdfs dfs -put employee.txt /user/spark_studio
2.启动spark shell
[root@jfp3-1 spark-1.0.0-bin-hadoop2]# ./bin/spark-shell --master spark://192.168.0.71:7077
3.编写脚本
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class Employee(employeeId: Int, name: String, departmentId: Int) // Create an RDD of Employee objects and register it as a table. val employees = sc.textFile("hdfs://jfp3-1:8020/user/spark_studio/employee.txt").map(_.split(",")).map(p => Employee(p(0), p(1), p(2).trim.toInt)) employees.registerAsTable("employee") // SQL statements can be run by using the sql methods provided by sqlContext. val fsis = sql("SELECT name FROM employee WHERE departmentId = 1") // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. fsis.map(t => "Name: " + t(0)).collect().foreach(println)
4.运行
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@17124319 scala> import sqlContext._ import sqlContext._ scala> case class Employee(employeeId: String, name: String, departmentId: Int) defined class Employee scala> val employees = sc.textFile("hdfs://jfp3-1:8020/user/spark_studio/employee.txt").map(_.split(",")).map(p => Employee(p(0), p(1), p(2).trim.toInt)) 14/06/18 09:54:25 INFO MemoryStore: ensureFreeSpace(138763) called with curMem=0, maxMem=309225062 14/06/18 09:54:25 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.5 KB, free 294.8 MB) employees: org.apache.spark.rdd.RDD[Employee] = MappedRDD[3] at map at <console>:19 scala> employees.registerAsTable("employee") scala> val fsis = sql("SELECT name FROM employee WHERE departmentId = 1") 14/06/18 09:54:44 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/06/18 09:54:44 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/06/18 09:54:44 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/06/18 09:54:44 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions fsis: org.apache.spark.sql.SchemaRDD = SchemaRDD[6] at RDD at SchemaRDD.scala:98 == Query Plan == Project [name#1:1] Filter (departmentId#2:2 = 1) ExistingRdd [employeeId#0,name#1,departmentId#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174 scala> fsis.map(t => "Name: " + t(0)).collect().foreach(println) 14/06/18 09:55:27 INFO FileInputFormat: Total input paths to process : 1 14/06/18 09:55:27 INFO SparkContext: Starting job: collect at <console>:20 14/06/18 09:55:27 INFO DAGScheduler: Got job 0 (collect at <console>:20) with 2 output partitions (allowLocal=false) 14/06/18 09:55:27 INFO DAGScheduler: Final stage: Stage 0(collect at <console>:20) 14/06/18 09:55:27 INFO DAGScheduler: Parents of final stage: List() 14/06/18 09:55:27 INFO DAGScheduler: Missing parents: List() 14/06/18 09:55:27 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[9] at map at <console>:20), which has no missing parents 14/06/18 09:55:27 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[9] at map at <console>:20) 14/06/18 09:55:27 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/06/18 09:55:27 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 1: jfp3-2 (NODE_LOCAL) 14/06/18 09:55:27 INFO TaskSetManager: Serialized task 0.0:0 as 3508 bytes in 2 ms 14/06/18 09:55:27 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor 2: jfp3-3 (NODE_LOCAL) 14/06/18 09:55:27 INFO TaskSetManager: Serialized task 0.0:1 as 3508 bytes in 0 ms 14/06/18 09:55:28 INFO TaskSetManager: Finished TID 1 in 1266 ms on jfp3-3 (progress: 1/2) 14/06/18 09:55:28 INFO TaskSetManager: Finished TID 0 in 1276 ms on jfp3-2 (progress: 2/2) 14/06/18 09:55:28 INFO DAGScheduler: Completed ResultTask(0, 1) 14/06/18 09:55:28 INFO DAGScheduler: Completed ResultTask(0, 0) 14/06/18 09:55:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/06/18 09:55:28 INFO DAGScheduler: Stage 0 (collect at <console>:20) finished in 1.284 s 14/06/18 09:55:28 INFO SparkContext: Job finished: collect at <console>:20, took 1.386154401 s
Name: Gong Shaocheng Name: Li Dachao Name: Qiu Xin
5.将数据存为parquet格式,并运行sql
scala> val parquetFile = sqlContext.parquetFile("hdfs://jfp3-1:8020/user/spark_studio/employee.parquet") 14/06/18 10:24:36 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/06/18 10:24:36 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/06/18 10:24:36 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/06/18 10:24:36 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions parquetFile: org.apache.spark.sql.SchemaRDD = SchemaRDD[13] at RDD at SchemaRDD.scala:98 == Query Plan == ParquetTableScan [employeeId#9,name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None scala> parquetFile.registerAsTable("parquetFile") scala> val telcos = sql("SELECT name FROM parquetFile WHERE departmentId = 3") 14/06/18 10:24:37 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/06/18 10:24:37 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/06/18 10:24:37 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/06/18 10:24:37 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions 14/06/18 10:24:37 INFO MemoryStore: ensureFreeSpace(180579) called with curMem=138763, maxMem=309225062 14/06/18 10:24:37 INFO MemoryStore: Block broadcast_1 stored as values to memory (estimated size 176.3 KB, free 294.6 MB) telcos: org.apache.spark.sql.SchemaRDD = SchemaRDD[14] at RDD at SchemaRDD.scala:98 == Query Plan == Project [name#10:0] Filter (departmentId#11:1 = 3) ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None scala> telcos.collect().foreach(println) 14/06/18 10:24:40 INFO FileInputFormat: Total input paths to process : 2 14/06/18 10:24:40 INFO ParquetInputFormat: Total input paths to process : 2 14/06/18 10:24:40 INFO ParquetFileReader: reading summary file: hdfs://jfp3-1:8020/user/spark_studio/employee.parquet/_metadata 14/06/18 10:24:40 INFO deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize 14/06/18 10:24:40 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize 14/06/18 10:24:40 INFO SparkContext: Starting job: collect at <console>:20 14/06/18 10:24:40 INFO DAGScheduler: Got job 2 (collect at <console>:20) with 2 output partitions (allowLocal=false) 14/06/18 10:24:40 INFO DAGScheduler: Final stage: Stage 2(collect at <console>:20) 14/06/18 10:24:40 INFO DAGScheduler: Parents of final stage: List() 14/06/18 10:24:40 INFO DAGScheduler: Missing parents: List() 14/06/18 10:24:40 INFO DAGScheduler: Submitting Stage 2 (SchemaRDD[14] at RDD at SchemaRDD.scala:98 == Query Plan == Project [name#10:0] Filter (departmentId#11:1 = 3) ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None), which has no missing parents 14/06/18 10:24:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (SchemaRDD[14] at RDD at SchemaRDD.scala:98 == Query Plan == Project [name#10:0] Filter (departmentId#11:1 = 3) ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None) 14/06/18 10:24:40 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks 14/06/18 10:24:40 INFO TaskSetManager: Starting task 2.0:0 as TID 4 on executor 2: jfp3-3 (NODE_LOCAL) 14/06/18 10:24:40 INFO TaskSetManager: Serialized task 2.0:0 as 3116 bytes in 1 ms 14/06/18 10:24:40 INFO TaskSetManager: Starting task 2.0:1 as TID 5 on executor 0: jfp3-4 (NODE_LOCAL) 14/06/18 10:24:40 INFO TaskSetManager: Serialized task 2.0:1 as 3116 bytes in 1 ms 14/06/18 10:24:40 INFO DAGScheduler: Completed ResultTask(2, 0) 14/06/18 10:24:40 INFO TaskSetManager: Finished TID 4 in 200 ms on jfp3-3 (progress: 1/2) 14/06/18 10:24:42 INFO DAGScheduler: Completed ResultTask(2, 1) 14/06/18 10:24:42 INFO TaskSetManager: Finished TID 5 in 2162 ms on jfp3-4 (progress: 2/2) 14/06/18 10:24:42 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/06/18 10:24:42 INFO DAGScheduler: Stage 2 (collect at <console>:20) finished in 2.177 s 14/06/18 10:24:42 INFO SparkContext: Job finished: collect at <console>:20, took 2.210887848 s
[Wo Binggang]
6. DSL syntax支持
scala> all.collect().foreach(println) 14/06/18 10:37:45 INFO SparkContext: Starting job: collect at <console>:24 14/06/18 10:37:45 INFO DAGScheduler: Got job 6 (collect at <console>:24) with 2 output partitions (allowLocal=false) 14/06/18 10:37:45 INFO DAGScheduler: Final stage: Stage 6(collect at <console>:24) 14/06/18 10:37:45 INFO DAGScheduler: Parents of final stage: List() 14/06/18 10:37:45 INFO DAGScheduler: Missing parents: List() 14/06/18 10:37:45 INFO DAGScheduler: Submitting Stage 6 (SchemaRDD[33] at RDD at SchemaRDD.scala:98 == Query Plan == Project [name#19:1] Filter (departmentId#20:2 >= 1) ExistingRdd [employeeId#18,name#19,departmentId#20], MapPartitionsRDD[30] at mapPartitions at basicOperators.scala:174), which has no missing parents 14/06/18 10:37:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage 6 (SchemaRDD[33] at RDD at SchemaRDD.scala:98 == Query Plan == Project [name#19:1] Filter (departmentId#20:2 >= 1) ExistingRdd [employeeId#18,name#19,departmentId#20], MapPartitionsRDD[30] at mapPartitions at basicOperators.scala:174) 14/06/18 10:37:45 INFO TaskSchedulerImpl: Adding task set 6.0 with 2 tasks 14/06/18 10:37:45 INFO TaskSetManager: Starting task 6.0:0 as TID 200 on executor 2: jfp3-3 (NODE_LOCAL) 14/06/18 10:37:45 INFO TaskSetManager: Serialized task 6.0:0 as 3541 bytes in 0 ms 14/06/18 10:37:45 INFO TaskSetManager: Starting task 6.0:1 as TID 201 on executor 1: jfp3-2 (NODE_LOCAL) 14/06/18 10:37:45 INFO TaskSetManager: Serialized task 6.0:1 as 3541 bytes in 1 ms 14/06/18 10:37:45 INFO TaskSetManager: Finished TID 200 in 33 ms on jfp3-3 (progress: 1/2) 14/06/18 10:37:45 INFO DAGScheduler: Completed ResultTask(6, 0) 14/06/18 10:37:45 INFO DAGScheduler: Completed ResultTask(6, 1) 14/06/18 10:37:45 INFO TaskSetManager: Finished TID 201 in 37 ms on jfp3-2 (progress: 2/2) 14/06/18 10:37:45 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 14/06/18 10:37:45 INFO DAGScheduler: Stage 6 (collect at <console>:24) finished in 0.039 s 14/06/18 10:37:45 INFO SparkContext: Job finished: collect at <console>:24, took 0.052556716 s [Gong Shaocheng] [Li Dachao] [Qiu Xin] [Cheng Jiangzhong] [Wo Binggang]