spark读写mysql

首先还是pom文件:

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.12</scala.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.7.7</hadoop.version>
    <scala.compat.version>2.11</scala.compat.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>

    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.45</version>
    </dependency>

    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
      <scope>runtime</scope>
    </dependency>

  </dependencies>

 

代码:读mysql  

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

object MysqlRDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[*]")
    val sparkContext = new SparkContext(sparkConf)

    val jdbcrdd: JdbcRDD[String] = new JdbcRDD(sparkContext
      , ()=>{
        Class.forName("com.mysql.jdbc.Driver")
        DriverManager.getConnection("jdbc:mysql://hadoop01:3306/transaction", "root", "root")
      }
      , "select * from orders where realTotalMoney>? and realTotalMoney<?"
      , 150
      , 151
      , 1
      , (r) => {
        r.getString(1)+","+
        r.getString(2)+","+
        r.getString(3)+","+
        r.getString(4)+","+
        r.getString(5)
      }
    )
    jdbcrdd.foreach(println)
    print(jdbcrdd.count())

    sparkContext.stop()

  }

}

  

 

写入mysql,这里有效率问题需要注意:

低效版本:

import java.sql.DriverManager

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RddToMysql {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")
    val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)
    val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)

    rdd.foreach{ case  (a: Int, b: String, c: Int) => {
      Class.forName("com.mysql.jdbc.Driver")
      val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")
      val sql = "insert into student(id,name,age) values(?,?,?)"
      val preparedStatement = connection.prepareStatement(sql)
      preparedStatement.setInt(1, a)
      preparedStatement.setString(2, b)
      preparedStatement.setInt(3, c)
      preparedStatement.executeUpdate()
      preparedStatement.close()

    }}

    sparkContext.stop()

  }

}

 

效率提升版本:

import java.sql.DriverManager

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RddToMysql {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")
    val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)
    val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)

    rdd.foreachPartition{case it:Iterator[(Int,String,Int)]=>{
      Class.forName("com.mysql.jdbc.Driver")
      val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")
      val sql = "insert into student(id,name,age) values(?,?,?)"
      it.foreach{case (a:Int,b:String,c:Int)=>{
        val preparedStatement = connection.prepareStatement(sql)
        preparedStatement.setInt(1, a)
        preparedStatement.setString(2, b)
        preparedStatement.setInt(3, c)
        preparedStatement.executeUpdate()
        preparedStatement.close()
      }
      }
    }}
    sparkContext.stop()
  }
}

  

 

spark读写mysql

上一篇:mysql随手记


下一篇:mysql基础-01