SparkSQL的创建方式


文章目录


hive和sparksql

Spark on Hive : Hive作为存储角色,Spark负责sql的解析优化,执行

Hive on Spark : Hive 作为存储又负责sql解析优化,Spark负责执行

DataFrame

分布式数据容器,与RDD类似,然而DataFrame像传统数据库的二维表格,除了数据外,掌握数据的结构信息,即schema,同时,与Hive类似DataFrame 也支持嵌套数据类型(struct、array 和 map)。从 API 易用性的角度上 看, DataFrame API提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低

DataFrame 的底层封装的是 RDD,只不过 RDD 的泛型是 Row 类型。

基本环境构建

	   SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("sql");
        SparkContext sc = new SparkContext(conf);
        //创建SQLContext上下文环境
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame df = sqlContext.read().format("json").load("data/json");

读取json格式

DataFrame df = sqlContext.read().format("json").load("sparksql/json");
DataFrame df2 = sqlContext.read().json("sparksql/json.txt");

使用原生的API

  df.rdd();
  df.select(df.col("name"), df.col("age").plus(10).alias("addage"));

注册临时表直接写sql

临时表不会雾化到磁盘中

df.registerTempTable("jtable");
DataFrame sql = sqlContext.sql("select age,count(1) from jtable group by age");
DataFrame sql2 = sqlContext.sql("select * from jtable");

scala版本

al conf = new SparkConf()
conf.setMaster("local").setAppName("jsonfile")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("sparksql/json")
//val df1 = sqlContext.read.format("json").load("sparksql/json")
df.show()
df.printSchema()
//select * from table
df.select(df.col("name")).show()
//select name from table where age>19
df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show()
//select count(*) from table group by age
df.groupBy(df.col("age")).count().show();
/**
* 注册临时表
*/
df.registerTempTable("jtable")
val result = sqlContext.sql("select * from jtable")
result.show()
sc.stop()

非 json 格式的 RDD 创建 DataFrame

通过反射的方式将非 json 格式的 RDD 转换成 DataFrame(不建议使用)

自定义类:

1.序列化

2.RDD 转成 DataFrame 后会根据映射将字段按 Assci 码排序

3.df.getInt(0)下标获取(不推荐使用),另一种是 df.getAs(“列名”)获取(推荐使用)

JavaRDDlineRDD = sc.textFile("sparksql/person.txt");
JavaRDDpersonRDD = lineRDD.map(new Function() {
private static final long serialVersionUID = 1L;
@Override
public Person call(String s) throws Exception {
Person p = new Person();
p.setId(s.split(",")[0]);
p.setName(s.split(",")[1]);
p.setAge(Integer.valueOf(s.split(",")[2]));
return p;
}
})

/**
* 传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
* 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了DataFrame
*/
DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
df.show();
df.registerTempTable("person");
sqlContext.sql("select name from person where id = 2").show()

DataFrame转换成JavaRDD

1.可以使用row.getInt(0),row.getString(1)…通过下标获取返回Row类型的数据,但是要注意列顺序问题—不常用

2.可以使用row.getAs(“列名”)来获取对应的列值。

JavaRDD<Row> javaRDD = df.javaRDD();JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {private static final long serialVersionUID = 1L;@Overridepublic Person call(Row row) throws Exception {Person p = new Person();//p.setId(row.getString(1));//p.setName(row.getString(2));//p.setAge(row.getInt(0));p.setId((String)row.getAs("id"));p.setName((String)row.getAs("name"));p.setAge((Integer)row.getAs("age"));return p;}});map.foreach(new VoidFunction<Person>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Person t) throws Exception {System.out.println(t);}});sc.stop()

动态创建 Schema 将非 json 格式的 RDD 转换成 DataFrame

/**
* 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源
于外部数据库
*/
ListasList =Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show();
sc.stop();

scala版本
val conf = new SparkConf()conf.setMaster("local").setAppName("rddStruct")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)val lineRDD = sc.textFile("./sparksql/person.txt")val rowRDD = lineRDD.map { x => {val split = x.split(",")RowFactory.create(split(0),split(1),Integer.valueOf(split(2)))} }val schema = StructType(List(StructField("id",StringType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))val df = sqlContext.createDataFrame(rowRDD, schema)df.show()df.printSchema()sc.stop(

读取 parquet 文件创建 DataFrame

将DataFrame存储成parquet文件,两种方式

df.write().mode(SaveMode.Overwrite)format("parquet")
.save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");

SaveMode指定文件保存时的模式

Overwrite:覆盖

Append:追加

ErrorIfExists:如果存在就报错

Ignore:如果存在就忽略

java版本

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("parquet");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDDjsonRDD = sc.textFile("sparksql/json");
DataFrame df = sqlContext.read().json(jsonRDD);
/**
* 将DataFrame保存成parquet文件,SaveMode指定存储文件时的保存模式
* 保存成parquet文件有以下两种方式:
*/
df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/
parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
df.show();
/**
* 加载parquet文件成DataFrame
* 加载parquet文件有以下两种方式:
*/
DataFrame  load  =
sqlContext.read().format("parquet").load("./sparksql/parquet");
load = sqlContext.read().parquet("./sparksql/parquet");
load.show();
sc.stop();

scala版本

val conf = new SparkConf()conf.setMaster("local").setAppName("parquet")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)val jsonRDD = sc.textFile("sparksql/json")val df = sqlContext.read.json(jsonRDD)df.show()/**
* 将DF保存为parquet文件
*/df.write.mode(SaveMode.Overwrite).format("parquet").save("./sparksql/pa
rquet")df.write.mode(SaveMode.Overwrite).parquet("./sparksql/parquet")/**
* 读取parquet文件
*/var result = sqlContext.read.parquet("./sparksql/parquet")result = sqlContext.read.format("parquet").load("./sparksql/parquet")result.show()sc.stop()

读取 JDBC 中的数据创建 DataFrame(MySql 为例)

第一种方式读取MySql数据库表,加载为DataFrame

Mapoptions = new HashMap();
options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "123456");
options.put("dbtable", "person");
DataFrame person = sqlContext.read().format("jdbc").options(options).load();
person.show();
person.registerTempTable("person");

第二种方式读取MySql数据表加载为DataFrame

DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "123456");
reader.option("dbtable", "score");
DataFrame score = reader.load();
score.show();
score.registerTempTable("score");
DataFrame result =
sqlContext.sql("select person.id,person.name,score.score from person,score
where person.name = score.name");
result.show();

将DataFrame结果保存到Mysql中

Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "123456");
result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:330
6/spark", "result", properties);
sc.stop()

scala

val conf = new SparkConf()
conf.setMaster("local").setAppName("mysql")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
/**
* 第一种方式读取Mysql数据库表创建DF
*/
val options = new HashMap[String,String]();
options.put("url", "jdbc:mysql://192.168.179.4:3306/spark")
options.put("driver","com.mysql.jdbc.Driver")
options.put("user","root")
options.put("password", "123456")
options.put("dbtable","person")
val person = sqlContext.read.format("jdbc").options(options).load()
person.show()

person.registerTempTable("person")
/**
* 第二种方式读取Mysql数据库表创建DF
*/
val reader = sqlContext.read.format("jdbc")
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark")
reader.option("driver","com.mysql.jdbc.Driver")
reader.option("user","root")
reader.option("password","123456")
reader.option("dbtable", "score")
val score = reader.load()
score.show()
score.registerTempTable("score")
val result = sqlContext.sql("select person.id,person.name,score.score from
person,score where person.name = score.name")
result.show()
/**
* 将数据写入到Mysql表中
*/
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.179.4:3306/spa
rk", "result", properties)
sc.stop()

               

上一篇:Spark的Parquet向量化读取原理


下一篇:hive支持的数据类型和存储格式