pyspark学习笔记


在pyspark读取多数据源中,读取mysql遇到的问题
from pyspark.sql import SparkSession
from pyspark.sql.types import Row

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("DataSourceTest") \
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

dataRDD = sc.textFile("G:\\data\\person.txt")
lineRDD = dataRDD.map(lambda x: x.split(" "))
personRDD = lineRDD.map(lambda line: Row(line[0], line[1], line[2]))

personDF = personRDD.toDF()
personDF.show()

#测试方法一 oneDF.write \ .format("jdbc") \ .option("url", "jdbc:mysql://localhost:3306/exercise?characterEncoding=UTF-8") \ .option("dbtable", "person") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("user", "root") \ .option("password", "root") \ .save()

#测试方法二
# oneDF.write \
# .jdbc(url="jdbc:mysql://localhost:3306/exercise?characterEncoding=UTF-8", table="person",
# properties={"user": "root", "password": "root"})
 

运行报错如下:

Traceback (most recent call last):
  File "G:/code_py/Send_a_player/Spark/SparkSQL/DataSourceTest.py", line 39, in <module>
    .option("password", "root") \
  File "G:\code_py\Send_a_player\venv\lib\site-packages\pyspark\sql\readwriter.py", line 825, in save
    self._jwrite.save()
  File "G:\code_py\Send_a_player\venv\lib\site-packages\py4j\java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "G:\code_py\Send_a_player\venv\lib\site-packages\pyspark\sql\utils.py", line 128, in deco
    return f(*a, **kw)
  File "G:\code_py\Send_a_player\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o61.save.
: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:99)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:99)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:194)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:198)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

 2020-12-24 08:51:15

上一篇:LeetCode: Partition List 解题报告


下一篇:PySpark︱DataFrame操作指南:增/删/改/查/合并/统计与数据处理