/**
* Created by songcl on 2016/6/24.
*/
import java.sql.DriverManager
//val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object insertmysql { def main(args:Array[String]): Unit = {
//classOf[com.mysql.jdbc.Driver]
// Class.forName("com.mysql.jdbc.Driver").newInstance();
val url = "jdbc:mysql://10.0.73.46:3306/falcon?user=data&password=datass"
val conf = new SparkConf().setAppName("Simple Application")//.setMaster("spark://10.0.58.21:7077")
val sc= new SparkContext(conf)
// val conn = DriverManager.getConnection(url)
// val conn2=DriverManager.getConnection(url)
val format = new java.text.SimpleDateFormat("yyyy/MM/dd")
val dat = format.format(new java.util.Date())
val log1 = sc.textFile("hdfs://10.0.58.21:9000/falcon/" + dat + "/*.log")
//val log1 =sc.textFile( "hdfs://10.0.58.21:9000/falcon/"+dat+"/*.log")
// println(log1.count())
//val sqlContext = new org.apache.spark.sql.SQLContext(sc1)
//val conf = new SparkConf().setAppName("Simple Application").setMaster("spark://10.0.58.21:7077")
// val sc = new SparkContext(conf)
// val log=sc.textFile(logFile)
//val t= log.take(2)
//val log1=sc.textFile("hdfs://10.0.58.21:9000/falcon/2016/*/*/*.log")
val format2 = new java.text.SimpleDateFormat("yyyyMMdd")
val dat2 = format2.format(new java.util.Date())
//val log2=sc1.textFile("hdfs://10.0.58.21:9000/user/yejin/logstash."+dat2+".log")
val log2 = sc.textFile("hdfs://10.0.58.21:9000/falcon/" + dat + "/*.log")
import sqlContext.implicits
val rowRDD=log2.map(line=>(line.split("\"message\":\"").last.split(" ").head.trim(),line.split("account: ").last.split(", args:").head)).filter({case(k,v) =>k.length==10 && !k.contains("TypeError:")}).filter({case(k,v)=>v.length==8}).distinct
//import sqlContext.implicits
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df=rowRDD.toDF("created","user_id")
df.insertIntoJDBC(url,"userlog",false)
//println(log2.count())
}}
参考链接
http://www.jianshu.com/p/b1a709a57faa
提交包,前提是要配置环境变量
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/spark/mysql/mysql.jar
spark-submit --class insertmysql /home/deploy/mysqltest.jar