首先还是pom文件:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.12</scala.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.7.7</hadoop.version> <scala.compat.version>2.11</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.45</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies>
代码:读mysql
import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext} object MysqlRDD { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[*]") val sparkContext = new SparkContext(sparkConf) val jdbcrdd: JdbcRDD[String] = new JdbcRDD(sparkContext , ()=>{ Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://hadoop01:3306/transaction", "root", "root") } , "select * from orders where realTotalMoney>? and realTotalMoney<?" , 150 , 151 , 1 , (r) => { r.getString(1)+","+ r.getString(2)+","+ r.getString(3)+","+ r.getString(4)+","+ r.getString(5) } ) jdbcrdd.foreach(println) print(jdbcrdd.count()) sparkContext.stop() } }
写入mysql,这里有效率问题需要注意:
低效版本:
import java.sql.DriverManager import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object RddToMysql { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]") val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf) val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1) rdd.foreach{ case (a: Int, b: String, c: Int) => { Class.forName("com.mysql.jdbc.Driver") val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root") val sql = "insert into student(id,name,age) values(?,?,?)" val preparedStatement = connection.prepareStatement(sql) preparedStatement.setInt(1, a) preparedStatement.setString(2, b) preparedStatement.setInt(3, c) preparedStatement.executeUpdate() preparedStatement.close() }} sparkContext.stop() } }
效率提升版本:
import java.sql.DriverManager import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object RddToMysql { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]") val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf) val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1) rdd.foreachPartition{case it:Iterator[(Int,String,Int)]=>{ Class.forName("com.mysql.jdbc.Driver") val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root") val sql = "insert into student(id,name,age) values(?,?,?)" it.foreach{case (a:Int,b:String,c:Int)=>{ val preparedStatement = connection.prepareStatement(sql) preparedStatement.setInt(1, a) preparedStatement.setString(2, b) preparedStatement.setInt(3, c) preparedStatement.executeUpdate() preparedStatement.close() } } }} sparkContext.stop() } }