1. 连接mysql
首先需要把mysql-connector-java-5.1.39.jar 拷贝到 spark 的jars目录里面;
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> val sqlContext=new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@3a649f9a
scala> sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/metastore",
| "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "DBS", "user" -> "root", "password" -> "root")).load().show
+-----+--------------------+--------------------+-------+----------+----------+
|DB_ID| DESC| DB_LOCATION_URI| NAME|OWNER_NAME|OWNER_TYPE|
+-----+--------------------+--------------------+-------+----------+----------+
| 1|Default Hive data...|hdfs://localhost:...|default| public| ROLE|
| 2| null|hdfs://localhost:...| aaa| root| USER|
| 6| null|hdfs://localhost:...| userdb| root| USER|
+-----+--------------------+--------------------+-------+----------+----------+
-----------------------------------------------------------------------------------------------------------------
scala> import org.apache.spark.sql.{SQLContext,SparkSession}
import org.apache.spark.sql.{SQLContext, SparkSession}
scala> val url="jdbc:mysql://localhost:3306/test?user=root&password=root&useUnicode=true&characterEncoding=UTF-8"
url: String = jdbc:mysql://localhost:3306/test?user=root&password=root&useUnicode=true&characterEncoding=UTF-8
scala> val con = new SQLContext(sc);
warning: there was one deprecation warning; re-run with -deprecation for details
con: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@3a973b5e
scala> con.read.format("jdbc").options(Map("url"->url,"dbtable"->"role")).load.show
+------+----+-------------------+-------------------+---+-----+
|roleid|name| dateid| addr|sex|level|
+------+----+-------------------+-------------------+---+-----+
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
| 650|null|2017-11-01 17:24:34| null| 1| 29|
| 651|wang|2018-06-06 16:16:55| shenzhen| 1| 60|
+------+----+-------------------+-------------------+---+-----+
scala> con.read.format("jdbc").option("url",url).option("dbtable","role").load.show
+------+----+-------------------+-------------------+---+-----+
|roleid|name| dateid| addr|sex|level|
+------+----+-------------------+-------------------+---+-----+
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
| 650|null|2017-11-01 17:24:34| null| 1| 29|
| 651|wang|2018-06-06 16:16:55| shenzhen| 1| 60|
+------+----+-------------------+-------------------+---+-----+
scala> val session=SparkSession.builder.getOrCreate()
session: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@71e393a2
scala> session.read.format("jdbc").options(Map("url"->url,"dbtable"->"role")).load.show
+------+----+-------------------+-------------------+---+-----+
|roleid|name| dateid| addr|sex|level|
+------+----+-------------------+-------------------+---+-----+
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
| 650|null|2017-11-01 17:24:34| null| 1| 29|
| 651|wang|2018-06-06 16:16:55| shenzhen| 1| 60|
+------+----+-------------------+-------------------+---+-----+
scala> session.read.format("jdbc").option("url",url).option("dbtable","role").load.show
+------+----+-------------------+-------------------+---+-----+
|roleid|name| dateid| addr|sex|level|
+------+----+-------------------+-------------------+---+-----+
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
| 650|null|2017-11-01 17:24:34| null| 1| 29|
| 651|wang|2018-06-06 16:16:55| shenzhen| 1| 60|
+------+----+-------------------+-------------------+---+-----+
scala> import java.util.Properties
import java.util.Properties
scala> val pro=new Properties()
pro: java.util.Properties = {}
scala> session.read.jdbc(url,"role",pro).show
+------+----+-------------------+-------------------+---+-----+
|roleid|name| dateid| addr|sex|level|
+------+----+-------------------+-------------------+---+-----+
| 1|null|2017-11-16 14:49:11|henan luohe linying| 1| 10|
| 40|null|2017-11-13 14:50:25| guangdong shenzhen| 1| 20|
| 110|null|2017-11-14 14:50:47| beijing| 1| 20|
| 200|null|2017-11-14 14:49:47| shandong qingdao| 0| 8|
| 400|null|2017-11-15 14:49:56| anhui hefei| 0| 4|
| 600|null|2017-11-15 14:50:05| hunan changsha| 0| 91|
| 650|null|2017-11-01 17:24:34| null| 1| 29|
| 651|wang|2018-06-06 16:16:55| shenzhen| 1| 60|
+------+----+-------------------+-------------------+---+-----+
2.连接hive,首先需要将hive的配置文件hive-site.xml拷贝到spark的conf目录下或者在conf目录下新建hive-site.xml,添加以下内容
(由于从hive拷贝过来的文件报错,因此本人采用了新建文件的方式)
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>/tmp/hive/tmp</value>
</property>
<property>
<name>hive.querylog.location</name>
<value>/tmp/hive/log</value>
</property>
</configuration>
启动 spark-shell:
HiveContext读取hive
scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext
scala> val hivecon=new HiveContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
hivecon: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@1b96f15e
scala> hivecon.sql("show databases").show
+------------+
|databaseName|
+------------+
| aaa|
| default|
| sparkhive|
| userdb|
+------------+
--------------------------------------------
--SparkSession读取hive
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val session=SparkSession.builder.getOrCreate()
session: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@331d651b
scala> session.sql("select sex,count(1) from gamedw.cust group by sex").show
+---+--------+
|sex|count(1)|
+---+--------+
| 1| 6|
| 0| 3|
+---+--------+