scala mysql jdbc oper

package egsql
import java.util.Properties import com.sun.org.apache.xalan.internal.xsltc.compiler.util.IntType
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object jdbc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("JdbcOperation").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
 ///////////////////////read///////////////////////////

val properties = new Properties()
properties.put("user","root")
properties.put("password","xxx")
val url = "jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=gbk&zeroDateTimeBehavior=convertToNull"
val stud_scoreDF = sqlContext.read.jdbc(url,"dtspark",properties)
stud_scoreDF.show() ///////////////////////write///////////////////////////
//通过并行化创建RDD
val personRDD = sc.parallelize(Array("blm 5 144", "jerry 18 188", "kitty 5 166")).map(_.split(" "))
//通过StrutType直接指定每个字段的schema
val schema = StructType(
List(
StructField("name",StringType,true),
StructField("age",IntegerType,true),
StructField("score",IntegerType,true)
)
)
//将RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p().trim, p().toInt, p().toInt))
//将schema信息应用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)
//创建Properties存储数据库相关属性
//将数据追加到数据库
personDataFrame.write.mode("append").jdbc("jdbc:mysql://127.0.0.1:3306/spark",
"dtspark",properties)
//停止SparkContext //print again
stud_scoreDF.show()
sc.stop()
}
} ================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
// :: INFO SparkContext: Running Spark version 2.2.
// :: INFO SparkContext: Submitted application: JdbcOperation
// :: INFO SecurityManager: Changing view acls to: fangping
// :: INFO SecurityManager: Changing modify acls to: fangping
// :: INFO SecurityManager: Changing view acls groups to:
// :: INFO SecurityManager: Changing modify acls groups to:
// :: INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(fangping); groups with view permissions: Set(); users with modify permissions: Set(fangping); groups with modify permissions: Set()
// :: INFO Utils: Successfully started service 'sparkDriver' on port .
// :: INFO SparkEnv: Registering MapOutputTracker
// :: INFO SparkEnv: Registering BlockManagerMaster
// :: INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
// :: INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
// :: INFO DiskBlockManager: Created local directory at C:\Users\fangping\AppData\Local\Temp\blockmgr-b8154e0d-e77b-4ba9-818b-71d3ffb8c553
// :: INFO MemoryStore: MemoryStore started with capacity 339.6 MB
// :: INFO SparkEnv: Registering OutputCommitCoordinator
// :: INFO Utils: Successfully started service 'SparkUI' on port .
// :: INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.18.3.13:4040
// :: INFO Executor: Starting executor ID driver on host localhost
// :: INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port .
// :: INFO NettyBlockTransferService: Server created on 172.18.3.13:
// :: INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
// :: INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.18.3.13, , None)
// :: INFO BlockManagerMasterEndpoint: Registering block manager 172.18.3.13: with 339.6 MB RAM, BlockManagerId(driver, 172.18.3.13, , None)
// :: INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.18.3.13, , None)
// :: INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.18.3.13, , None)
// :: INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/E:/back/scalaWs/Spark2Demo/spark-warehouse/').
// :: INFO SharedState: Warehouse path is 'file:/E:/back/scalaWs/Spark2Demo/spark-warehouse/'.
// :: INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
// :: INFO CodeGenerator: Code generated in 239.883814 ms
// :: INFO CodeGenerator: Code generated in 15.603579 ms
// :: INFO SparkContext: Starting job: show at jdbc.scala:
// :: INFO DAGScheduler: Got job (show at jdbc.scala:) with output partitions
// :: INFO DAGScheduler: Final stage: ResultStage (show at jdbc.scala:)
// :: INFO DAGScheduler: Parents of final stage: List()
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage (MapPartitionsRDD[] at show at jdbc.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 7.5 KB, free 339.6 MB)
// :: INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.2 KB, free 339.6 MB)
// :: INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.18.3.13: (size: 4.2 KB, free: 339.6 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ResultStage (MapPartitionsRDD[] at show at jdbc.scala:) (first tasks are for partitions Vector())
// :: INFO TaskSchedulerImpl: Adding task set 0.0 with tasks
// :: INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID , localhost, executor driver, partition , PROCESS_LOCAL, bytes)
// :: INFO Executor: Running task 0.0 in stage 0.0 (TID )
// :: INFO JDBCRDD: closed connection
// :: INFO Executor: Finished task 0.0 in stage 0.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID ) in ms on localhost (executor driver) (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: ResultStage (show at jdbc.scala:) finished in 0.224 s
// :: INFO DAGScheduler: Job finished: show at jdbc.scala:, took 2.466901 s
+-----+---+-----+
| name|age|score|
+-----+---+-----+
| swk|||
| blm| | |
|jerry| | |
|kitty| | |
+-----+---+-----+ // :: INFO SparkContext: Starting job: jdbc at jdbc.scala:
// :: INFO DAGScheduler: Got job (jdbc at jdbc.scala:) with output partitions
// :: INFO DAGScheduler: Final stage: ResultStage (jdbc at jdbc.scala:)
// :: INFO DAGScheduler: Parents of final stage: List()
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage (MapPartitionsRDD[] at jdbc at jdbc.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 10.8 KB, free 339.6 MB)
// :: INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.8 KB, free 339.6 MB)
// :: INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.18.3.13: (size: 5.8 KB, free: 339.6 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ResultStage (MapPartitionsRDD[] at jdbc at jdbc.scala:) (first tasks are for partitions Vector())
// :: INFO TaskSchedulerImpl: Adding task set 1.0 with tasks
// :: INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID , localhost, executor driver, partition , PROCESS_LOCAL, bytes)
// :: INFO Executor: Running task 0.0 in stage 1.0 (TID )
// :: INFO CodeGenerator: Code generated in 13.199986 ms
// :: INFO CodeGenerator: Code generated in 86.854105 ms
// :: INFO Executor: Finished task 0.0 in stage 1.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID ) in ms on localhost (executor driver) (/)
// :: INFO DAGScheduler: ResultStage (jdbc at jdbc.scala:) finished in 0.473 s
// :: INFO DAGScheduler: Job finished: jdbc at jdbc.scala:, took 0.496857 s
// :: INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
// :: INFO SparkContext: Starting job: show at jdbc.scala:
// :: INFO DAGScheduler: Got job (show at jdbc.scala:) with output partitions
// :: INFO DAGScheduler: Final stage: ResultStage (show at jdbc.scala:)
// :: INFO DAGScheduler: Parents of final stage: List()
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage (MapPartitionsRDD[] at show at jdbc.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 7.5 KB, free 339.6 MB)
// :: INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.2 KB, free 339.6 MB)
// :: INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.18.3.13: (size: 4.2 KB, free: 339.6 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ResultStage (MapPartitionsRDD[] at show at jdbc.scala:) (first tasks are for partitions Vector())
// :: INFO TaskSchedulerImpl: Adding task set 2.0 with tasks
// :: INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID , localhost, executor driver, partition , PROCESS_LOCAL, bytes)
// :: INFO Executor: Running task 0.0 in stage 2.0 (TID )
// :: INFO JDBCRDD: closed connection
// :: INFO Executor: Finished task 0.0 in stage 2.0 (TID ). bytes result sent to driver
// :: INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID ) in ms on localhost (executor driver) (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: ResultStage (show at jdbc.scala:) finished in 0.027 s
// :: INFO DAGScheduler: Job finished: show at jdbc.scala:, took 0.088025 s
+-----+---+-----+
| name|age|score|
+-----+---+-----+
| swk|||
| blm| | |
|jerry| | |
|kitty| | |
| blm| | |
|jerry| | |
|kitty| | |
+-----+---+-----+ // :: INFO SparkUI: Stopped Spark web UI at http://172.18.3.13:4040
// :: INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
// :: INFO MemoryStore: MemoryStore cleared
// :: INFO BlockManager: BlockManager stopped
// :: INFO BlockManagerMaster: BlockManagerMaster stopped
// :: INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
// :: INFO SparkContext: Successfully stopped SparkContext
// :: INFO ShutdownHookManager: Shutdown hook called
// :: INFO ShutdownHookManager: Deleting directory C:\Users\\AppData\Local\Temp\spark-e888fd39-2e41-43e5-829b-ca203b786cef
上一篇:Fabric CA的部署与使用


下一篇:centos7 无法启动网络(service network restart)错误解决办法