在pyspark读取多数据源中,读取mysql遇到的问题
from pyspark.sql import SparkSession from pyspark.sql.types import Row spark = SparkSession \ .builder \ .master("local[*]") \ .appName("DataSourceTest") \ .getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR") dataRDD = sc.textFile("G:\\data\\person.txt") lineRDD = dataRDD.map(lambda x: x.split(" ")) personRDD = lineRDD.map(lambda line: Row(line[0], line[1], line[2])) personDF = personRDD.toDF() personDF.show()
#测试方法一 oneDF.write \ .format("jdbc") \ .option("url", "jdbc:mysql://localhost:3306/exercise?characterEncoding=UTF-8") \ .option("dbtable", "person") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("user", "root") \ .option("password", "root") \ .save()
#测试方法二
# oneDF.write \
# .jdbc(url="jdbc:mysql://localhost:3306/exercise?characterEncoding=UTF-8", table="person",
# properties={"user": "root", "password": "root"})
运行报错如下:
Traceback (most recent call last): File "G:/code_py/Send_a_player/Spark/SparkSQL/DataSourceTest.py", line 39, in <module> .option("password", "root") \ File "G:\code_py\Send_a_player\venv\lib\site-packages\pyspark\sql\readwriter.py", line 825, in save self._jwrite.save() File "G:\code_py\Send_a_player\venv\lib\site-packages\py4j\java_gateway.py", line 1305, in __call__ answer, self.gateway_client, self.target_id, self.name) File "G:\code_py\Send_a_player\venv\lib\site-packages\pyspark\sql\utils.py", line 128, in deco return f(*a, **kw) File "G:\code_py\Send_a_player\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o61.save. : java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45) at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:99) at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:99) at scala.Option.foreach(Option.scala:407) at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99) at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:194) at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:198) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
2020-12-24 08:51:15