1.安装启动检查Mysql服务。
netstat -tunlp (3306)
2.spark 连接mysql驱动程序。
cd /usr/local/hive/lib
ls mysql*
cp mysql-connector-java-5.1.40-bin.jar /usr/local/spark/jars
cd /usr/local/spark/jars
ls mysql*
3.启动 Mysql shell,新建数据库spark,表student。
create database spark;
use spark;
create table student(id int(4),name char(20),gender char(4),age int(4));
insert into student values(1,‘Xueqian‘,‘F‘,23);
insert into student values(2,‘Weiliang‘,‘M‘,24);
insert into student values(3,‘Rongcheng‘,‘M‘,26);
select *from student;
4.spark读取MySQL数据库中的数据
from pyspark.sql.types import *
spark.read.format(‘jdbc‘).option(‘url‘,‘jdbc:mysql://localhost:3306/spark?useSSL=false‘).option(‘driver‘,‘com.mysql.jdbc.Driver‘).option(‘dbtable‘,‘student‘).option(‘user‘,‘root‘).option(‘password‘,‘hadoop‘).load().show()
5.spark向MySQL数据库写入数据
from pyspark.sql.types import *
from pyspark.sql import Row
studentRdd =spark.sparkContext.parallelize(["4 Guanhua M 27"]).map(lambda line:line.split(" "))
rowRdd=studentRdd.map(lambda p:Row(int(p[0]),p[1].strip(),p[2].strip(),int(p[3])))
bt=StructType([StructField("id",IntegerType(),True),StructField("name",StringType(),True),StructField("gender",StringType(),True),StructField("age",IntegerType(),True)])
student_b=spark.createDataFrame(rowRdd,bt)
prop={}
prop[‘user‘]=‘root‘
prop[‘password‘]=‘hadoop‘
prop[‘driver‘]=‘com.mysql.jdbc.Driver‘
student_b.write.jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false","student","append",prop)
spark.read.format(‘jdbc‘).option(‘url‘,‘jdbc:mysql://localhost:3306/spark?useSSL=false‘).option(‘driver‘,‘com.mysql.jdbc.Driver‘).option(‘dbtable‘,‘student‘).option(‘user‘,‘root‘).option(‘password‘,‘hadoop‘).load().show()