用户:
方便快速从不同的数据源(json、parquet、rdbms),经过混合处理(json join parquet),
再将处理结果以特定的格式(json、parquet)写回到指定的系统(HDFS、S3)上去
Spark SQL 1.2 ==> 外部数据源API
外部数据源的目的
1)开发人员:是否需要把代码合并到spark中????
weibo
--jars
2)用户
读:spark.read.format(format)
format
build-in: json parquet jdbc csv(2+)
packages: 外部的 并不是spark内置 https://spark-packages.org/
写:people.write.format("parquet").save("path")
处理parquet数据
RuntimeException: file:/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file
val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
.stringConf
.createWithDefault("parquet")
#注意USING的用法
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
path "/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
)
SELECT * FROM parquetTable
spark.sql("select deptno, count(1) as mount from emp where group by deptno").filter("deptno is not null").write.saveAsTable("hive_table_1")
org.apache.spark.sql.AnalysisException: Attribute name "count(1)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
spark.sqlContext.setConf("spark.sql.shuffle.partitions","10")
在生产环境中一定要注意设置spark.sql.shuffle.partitions,默认是200
操作MySQL的数据:
spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/hive").option("dbtable", "hive.TBLS").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
java.sql.SQLException: No suitable driver
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306", "hive.TBLS", connectionProperties)
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:mysql://localhost:3306",
dbtable "hive.TBLS",
user 'root',
password 'root',
driver 'com.mysql.jdbc.Driver'
)
外部数据源综合案例
create database spark;
use spark;
CREATE TABLE DEPT(
DEPTNO int(2) PRIMARY KEY,
DNAME VARCHAR(14) ,
LOC VARCHAR(13) ) ;
INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');