1、配置文件
package config import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} case object conf { private val master = "local[*]" val confs: SparkConf = new SparkConf().setMaster(master).setAppName("jobs") // val confs: SparkConf = new SparkConf().setMaster("http://laptop-2up1s8pr:4040/").setAppName("jobs") val sc = new SparkContext(confs) sc.setLogLevel("ERROR") val spark_session: SparkSession = SparkSession.builder() .appName("jobs").config(confs).getOrCreate() // 设置支持笛卡尔积 对于spark2.0来说 spark_session.conf.set("spark.sql.crossJoin.enabled",true) }
2、连接mysql8.0 操作多表
package operationMysql import config.conf.{sc, spark_session} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} import config.conf.spark_session.implicits._ object readingMysqlOperation { def main(args: Array[String]): Unit = { /* val df: DataFrame = spark_session.read .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/junsheng?useUnicode=true&characterEncoding=utf-8") .option("dbtable", "订单") .option("user", "root") .option("password", "123456") .load() df.show() * */ //以jdbc方式连接mysql val url="jdbc:mysql://localhost:3306/junsheng?useUnicode=true&characterEncoding=utf-8" //&useSSL=true //设置用户名和密码信息 val prop = new java.util.Properties prop.setProperty("user","root") prop.setProperty("password","123456") //创建sqlContext对象 //读取dat_order_item表 val df1: DataFrame = spark_session.read.jdbc(url,"订单明细","订单ID",0,5000000,4,prop) val df2: DataFrame = spark_session.read.jdbc(url, "订单", "订单ID", 0, 5000000,4,prop) //读取dat_order表 //将dat_order_item和dat_order DF注册成spark临时表 df1.createOrReplaceTempView("data1") df2.createOrReplaceTempView("data2") //使用sqlContext.sql("XXX")方式执行查询语句 // df2.show() val ywSQL:String= "SELECT dt1.`订单ID`,dt2.`客户ID`,dt1.`产品ID`,dt1.`单价`,dt1.`数量` " + "FROM data1 AS dt1 LEFT JOIN data2 as dt2 ON dt1.`订单ID`=dt2.`订单ID`" val df: DataFrame = spark_session.sql(ywSQL) df.rdd.map(lines=>{(lines(0).toString,lines(2).toString.toDouble,lines(4).toString.toInt)}) .toDF("订单ID","产品单价","订购数量").show() } }
scala spark2.0 sparksql 连接mysql8.0 操作多表 使用 dataframe 及RDD进行数据处理