5. 导入Java依赖要使用SparkSQL的API,首先要导入Scala,Spark,SparkSQL的依赖:<properties><scala.version>2.11.8</scala.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.0.2</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.32</version></dependency><!-- spark sql 依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.0.2</version></dependency></dependencies>6. Java代码操作DataFrame1.DataFrame作为SparkSQL的核心API,它是通过SparkContext来获取,代码如下: //1.创建spark session,并指定appName,需要将任务提交到哪里。val spark = newSparkSession.Builder().appName("CaseClassSchema").master("local[2]").getOrCreate()//2.获取SparkContext,后面所有的SparkSQL操作都需要该上下文。val sc: SparkContext = spark.sparkContext上文中,master指定的是sparkSQL的执行环境,可以是集群也可以是本地,这里的local[2]指定的是本地单机运行模式,使用2条线程来执行任务,注意这里local必须是小写的。SparkSession是SparkContext的升级版,他支持HiveContext和SparkContext。2.通过SparkContext后我们可以获取到数据对应的DataFrame,代码如下://3.获取每一行内容的RDD,并通过schema将RDD转化成DFval lineRdd: RDD[Array[String]] =sc.textFile("hdfs://node01:8020/spark_res/people.txt").map(_.split(", "))val peopleRdd: RDD[People] = lineRdd.map(x => People(x(0), x(1).toInt))import spark.implicits._val peopleDF: DataFrame = peopleRdd.toDF//4.对DF进行操作peopleDF.printSchema()peopleDF.show()println(peopleDF.head())println(peopleDF.count())peopleDF.columns.foreach(println)使用DataFrame之前,必须导包,否则无法使用 toDF 方法。3.DataFrame操作SQL有两种方式,DSL和SQL,代码如下://DSLpeopleDF.select("name","age").show()peopleDF.filter($"age">20).groupBy("name").count().show//SQLpeopleDF.createOrReplaceTempView("t_people")spark.sql("select * from t_people order by age desc").show4.SQL操作完毕后必须关闭sparkContext和SparkSession,代码分别是 sc.stop() 和 spark.stop()除了读取普通文件,还可以读取mysql orcale数据,代码如下:val properties = new Properties()properties.setProperty("user","root")properties.setProperty("password","123456")//重点代码,连接JDBCval ipLocationDF: DataFrame =spark.read.jdbc("jdbc:mysql://localhost:3306/iplocation","iplocation",properties)ipLocationDF.printSchemaipLocationDF.show7. 保存DataFrame的结果除了读取数据,DataFrame还提供了一整套保存处理数据结果的机制,代码如下:object SparkSqlToMysql {def main(args: Array[String]): Unit = {val sc: SparkContext = ...//1.通过spark session读取json数据,并返回DataFrameval peopleDF: DataFrame = spark.read.json(args(0))//2.将DataFrame注册为t_people表,并对该表进行SQL语句操作peopleDF.createOrReplaceTempView("t_people")val resultDF = spark.sql("select * from t_people")//3.对上个SQL语句的操作结果进行保存val properties = new Properties()properties.setProperty("user","root")properties.setProperty("password","123456")resultDF.write.jdbc("jdbc:mysql://192.168.52.105:3306/iplocation","spark_save_result",properties)//close sparkcontext sparksession..}}需要注意的是resultDF.write,其返回DataFrameWriter。1. 该类可以保存任何SQL的结果,并且由于API的便利性,可以保存成多种格式,如 text ,json ,orc,csv,jdbc等。2. 对于保存的数据,系统提供了几种保存模式,可以通过mode(String)来指定:overwrite : 重写文件内部数据append : 将新增内容添加到文件末尾ignore : 如果文件已存在 则忽略操作error : default option, 如果文件存在,则抛出异常8. 总结1. 在SparkSQL系列中,我们首先介绍了SparkSQL的核心API DataFrame,DataFrame内部分为RDD基础分布式数据集和Schema元信息。DataFrame的SQL代码在执行之前会经过Catalyst优化,变成高效的处理代码。接着我们介绍了通过 spark-shell 和 java api 两种客户端窗口操作DataFrame。2. 创建DataFrame有两种方式:1. 通过 rdd.toDF 直接将rdd转换成DataFrame。2. 通过 spark.read 直接读取各种格式的数据。3. 查看DataFrame的内容有两种:1. 通过 df.printSchema 查看数据结构。2. 通过 df.show 查看数据内容。4. df提供了DSL和SQL两种风格的来操作数据。对于DSL风格,常见的方法有 select() filter() 等。5. 本文在后半部分主要介绍SparkSQL如何与mysql进行交互,除此之外,还支持Parquet,ORC,JSON,Hive,JDBC , avro协议文件等交互。 文章来源于公总号黑马程序员广州中心(itheimagz)更多资源请关注 |