一:意义
1.意义
如果可以实现这个功能,就可以使用spark代替sqoop,功能程序就实现这个功能。
二:hive操作
1.准备数据
启动hive
否则报错,因为在hive与spark集成的时候,配置过配置项。
后来,又看见这个文档,感觉很好的解释了我存在的问题:https://blog.csdn.net/freedomboy319/article/details/44828337
2.新建部门员工表
-》创建员工表
create table emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)
row format delimited fields terminated by '\t';
load data local inpath '/opt/datas/emp.txt' into table emp; -》部门表
create table dept(
deptno int,
dname string,
loc string
)
row format delimited fields terminated by '\t';
load data local inpath '/opt/datas/dept.txt' into table dept;
3.效果
三:程序
1.大纲
2.前提
需要hive-site.xml
3.需要的依赖
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.4</version>
</dependency>
4.报错如下
Exception in thread "main" java.sql.SQLNonTransientConnectionException: CLIENT_PLUGIN_AUTH is required
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:550)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:537)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:527)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:512)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:480)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:498)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:494)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:72)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1634)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:637)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:351)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:224)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:61)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:52)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)
at com.scala.it.HiveToMysql$.main(HiveToMysql.scala:28)
at com.scala.it.HiveToMysql.main(HiveToMysql.scala)
Caused by: com.mysql.cj.core.exceptions.UnableToConnectException: CLIENT_PLUGIN_AUTH is required
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:54)
at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73)
at com.mysql.cj.mysqla.io.MysqlaProtocol.rejectConnection(MysqlaProtocol.java:319)
at com.mysql.cj.mysqla.authentication.MysqlaAuthenticationProvider.connect(MysqlaAuthenticationProvider.java:207)
at com.mysql.cj.mysqla.io.MysqlaProtocol.connect(MysqlaProtocol.java:1361)
at com.mysql.cj.mysqla.MysqlaSession.connect(MysqlaSession.java:132)
at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:1754)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1624)
... 8 more
原因:
mysql-connect版本不匹配,换5.1.17版本。
5.程序
package com.scala.it import java.util.Properties import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext} object HiveToMysql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("hive-yo-mysql")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = new HiveContext(sc)
val (url, username, password) = ("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/hadoop09", "root", "123456")
val props = new Properties()
props.put("user", username)
props.put("password", password) // ==================================
// 第一步:同步hive的dept表到mysql中
sqlContext
.read
.table("hadoop09.dept") // database.tablename
.write
.mode(SaveMode.Overwrite) // 存在覆盖
.jdbc(url, "mysql_dept", props)
}
}
6.效果