pyspark 入门

为什么用

python中pandas是数据分析的利器,具有并行的特兹那个,而且函数和数据计算的方法非常方便,是数据分析中的瑞士军刀。但是受限于单个机器性能和配置的限制,当大规模数据,比如100G-10TB规模的数据时,pandas就显得局限了,就像瑞士军刀杀牛,难以下手。这时就需要基于分布式计算的大数据工具spark,是基于分布式计算,可以基于hadoop和hive,进行分布式的数据计算,同时spark具有python API,可以通过类似python的语法,无门槛的过渡。

怎么用

pyspark支持RDD和DataFrame的数据类型,但是RDD在python中相比于scala和R语言,是非常慢的,而DataFrame使性能在各种语言中都保持稳定。所以我们可以试用pyspark的DataFrame对大规模数据进行数据清理,然后转为pandas.dataframe,进行数据可视化。

初始化类 class pyspark.sql.SparkSession 类

spark = SparkSession.builder \
...     .master("local") \
...     .appName("Word Count") \
...     .config("spark.some.config.option", "some-value") \
...     .getOrCreate

Builder for SparkSession 这个就是生成一个 sparksession 实例。他下面有一些支持的函数
master: 设置 spark master 的 url 。由于我们集群使用的是 spark on yarn 的模式,所以可以用选择不设置这个参数。

appName: 在程序运行的时候看到的应用名称。

config: 其实其他参数也是调用 .config 设置相应的 config 参数,例如 .master 就是调用了 .config(“spark.master”, master)。

enableHiveSupport: 启动对 hive 的支持。例如支持 hivesql 以及 hive udf 等。

getOrCreate: 得到一个现成的 SparkSession ,如果没有就生成一个。

  • SparkSession.catalog:
    提供一个接口来操作 create drop alter query 库或者表,比如:
catalog.listTables().select($"name").show(2,false)
  • SparkSession.createDataFrame:
    可以获得从 rdd python list 和 pandas df 创建 df 的能力。下面贴一下官方的例子:
l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> spark.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]


>>> d = [{'name': 'Alice', 'age': 1}]
>>> spark.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]


>>> rdd = sc.parallelize(l)
>>> spark.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df = spark.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
  • SparkSession.sql:
    使用sql方法返回的是df,例如:
>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
  • SparkSession.table
    这个可以返回指定表的 df ,将表复制成df

函数说明

  • 查询
    1,.show(n),方法默认显示前10行
    2,spark.sql("select * from ").collect()

利用DataFrame API查询

  • swimmers.count() 返回dataframe 的行数
  • swimmers.select().filter().show() 返回筛选后数据

利用sql查询

  • spark.sql(“select count(1) from swimmers”).show()
    sql语句和sql语言一致,也可以试用where 筛选

DataFrame函数说明

  • agecol=people.age 选择其中的列

  • agg(*exps)
    应用到整个没有分组的dataframe上,全称是df.groupby().agg()

  • alias(alias)
    返回一个设置了别名alias的dataframe,具有重命名的功能

df_as1 = df.alias("df_as1")
  • approxQuantile(col,pro probabilities, relativeError)
    返回一个列的中值列表

  • cache()
    返回dataframed的存贮水平

  • checkpoint([eager])
    返回这个数据集的检查点版本,检查点可以用来截断这个DataFrame的逻辑计划,这在计划可能呈指数增长的迭代算法中特别有用。它将保存到SparkContext.setCheckpointDir()设置的检查点目录中的文件中。

  • coalesce(numPartitions)

  • colRegex(colName)

  • collect()
    返回所有的数据记录的list

  • corr(col1, col2[, method])

  • count()
    返回dataframe的行数

  • cov(col1, col2)
    计算给定列的样本协方差

  • createGlobalTempView(name)
    通过 df 创建一个全局的临时表。他的生命周期跟 spark 应用的生命周期相关联。如果视图中已经存在这个名字了会抛出 TempTableAlreadyExistsException 的错误。

  • createOrReplaceGlobalTempView(name)
    这个对比 createGlobalTempView 不同的点是不会抛出 TempTableAlreadyExistsException 的错误,会直接替换。

  • createOrReplaceTempView(name)
    使用现成的df创建临时视图,可以使用sql语句获取数据

  • createTempView(name)

  • crossJoin(other)
    和另外一个 df 取笛卡尔积例如

>>> df.select("age", "name").collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df2.select("name", "height").collect()
[Row(name=u'Tom', height=80), Row(name=u'Bob', height=85)]
>>> df.crossJoin(df2.select("height")).select("age", "name", "height").collect()
[Row(age=2, name=u'Alice', height=80), Row(age=2, name=u'Alice', height=85),
 Row(age=5, name=u'Bob', height=80), Row(age=5, name=u'Bob', height=85)]
  • crosstab(col1, col2)
  • cube(*cols)
  • describe(*cols)
    计算数字和字符串列的基本统计信息
>>> df.describe(['age']).show()
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|               3.5|
| stddev|2.1213203435596424|
|    min|                 2|
|    max|                 5|
+-------+------------------+
>>> df.describe().show()
+-------+------------------+-----+
|summary|               age| name|
+-------+------------------+-----+
|  count|                 2|    2|
|   mean|               3.5| null|
| stddev|2.1213203435596424| null|
|    min|                 2|Alice|
|    max|                 5|  Bob|
+-------+------------------+-----+
  • distinct()
    返回一个不包含重复值的新的dataframe
  • drop(*cols)
    返回删除指定列的dataframe
  • dropDuplicates([subset])
    返回一个新的 df ,这个 df 里面不再有重复的记录。可选参数可以让我们选择关心的字段进行去重。
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+
  • drop_duplicates([subset])

  • dropna([how, thresh, subset])
    返回了删除含有空值的列的dataframe

  • exceptAll(other)

  • explain([extended, mode])

  • fillna(value[, subset])
    填充缺失值,subset指定需要填充的列

>>> df4.na.fill(50).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|    50|  Bob|
| 50|    50|  Tom|
| 50|    50| null|
+---+------+-----+
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+---+------+-------+
|age|height|   name|
+---+------+-------+
| 10|    80|  Alice|
|  5|  null|    Bob|
| 50|  null|    Tom|
| 50|  null|unknown|
+---+------+-------+
  • filter(condition)
    给定特定条件过滤,和where是同名函数
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]

>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]

>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]

>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
  • first()
    返回第一条 df

  • foreach(f)
    定义一个函数,会让每个 df 都执行该函数

>>> def f(person):
...     print(person.name)
>>> df.foreach(f)
  • foreachPartition(f)

定义一个函数,会让每个 partitions 都执行这个函数

>>> def f(people):
...     for person in people:
...         print(person.name)
>>> df.foreachPartition(f)
  • freqItems(cols[, support])

  • groupBy(*cols)=group_by
    可以使用 agg 方法对其进行各种各样的聚合, spark sql 专门有个类为其提供了非常多的处理函数。See GroupedData for all the available aggregate functions.

>>> df.groupBy().avg().collect()
[Row(avg(age)=3.5)]
>>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
[Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(df.name).avg().collect())
[Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(['name', df.age]).count().collect())
[Row(name=u'Alice', age=2, count=1), Row(name=u'Bob', age=5, count=1)]
  • head([n]) 返回前n行
    没有指定n时,默认是一行

  • hint(name, *parameters)

  • inputFiles()

  • intersect(other)

  • intersectAll(other)

  • isLocal()

  • join(other, on, how)
    使用给定的条件和其他 df 进行 join。
    other:另外一个df.
    on:条件,join的位置 列名
    how:默认是inner

  • limit(num)
    限制拿多少条

  • localCheckpoint([eager])

  • mapInPandas(func, schema)

  • orderBy(*cols, **kwargs)
    返回一个被指定 col 排序好的 df

>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]

>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
  • persist([storageLevel])
  • printSchema()
    打印出该 df 的 schema
  • randomSplit(weights[, seed])
  • registerTempTable(name)
    将 df 注册成一个临时生命周期是 SparkSession 的周期,这个跟上面 createOrReplaceTempView 互为同名函数,就是调用该函数生成的。
  • repartition(numPartitions, *cols)
    返回一个新的 df,这个新的 df 被给定的 numPartitions 数量进行 hash 重分区。numPartitions可以是指定分区或列的目标数量的int。如果它是一个列,它将被用作第一个分区列。如果没有指定,则使用默认分区数。
  • repartitionByRange(numPartitions, *cols)
  • replace(to_replace, value, subset)
    返回一个 df 用参数位置2的值替换掉参数位置是1的值。DataFrame.replace() and DataFrameNaFunctions.replace()互为同名函数
>>> df4.na.replace(10, 20).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  20|    80|Alice|
|   5|  null|  Bob|
|null|  null|  Tom|
|null|  null| null|
+----+------+-----+

>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|   A|
|   5|  null|   B|
|null|  null| Tom|
|null|  null|null|
+----+------+----+
  • rollup(*cols)
  • sameSemantics(other)
  • sample([withReplacement, fraction, seed])
  • sampleBy(col, fractions[, seed])
  • select(*cols)
    返回 select 能找到的数据
  • selectExpr(*expr)
  • semanticHash()
  • show([n, truncate, vertical])
    打印前n行数据带控制台
  • sort(*cols, **kwargs)
    根据给定的 cols 进行排序之后返回新的 df
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
  • sortWithinPartitions(*cols, **kwargs)

  • subtract(other)
    取目标 df ohter 和 df 的补集。

  • summary(*statistics)

  • tail(num)

  • take(num)
    返回df前num条数据

  • toDF(*cols)
    返回新的df,新的列名对应list顺序

  • toJSON([use_unicode])
    将dataframe 数据格式转成rdd格式

  • toLocalIterator([prefetchPartitions])
    返回一个迭代器,包含了dataframe的所有列

  • toPandas()
    将数据转换为pandas.dataframe数据类型

  • transform(func)
    返回一个新的dataframe

  • union(other)
    交集

  • unionAll(other)
    并集

  • unionByName(other[, allowMissingColumns])
    返回包含这个和其他dataframe中独一无二的列的,交集dataframe

  • unpersist([blocking])

  • where(condition)
    功能与filter一样

  • withColumn(colName, col)
    返回一个通过添加或替换列名的新的dataframe

  • withColumnRenamed(existing, new)
    返回一个重命名列名的dataframe

  • withWatermark(eventTime, delayThreshold)

  • writeTo(table)

DataFrame属性

  • columns
    以列表形式返回所有列名。
  • dtypes
    以列表形式返回所有列名及其数据类型。
  • isStreaming
  • na
  • rdd
  • schema
    返回dataframe的架构
  • stat
  • storageLevel
  • write
    非流接口向外部存储写入数据
  • writeStream
    流接口向外部存储写入数据

参考文章:
https://www.cnblogs.com/piperck/p/10446720.html
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html

上一篇:pyspark写入hive(二) 使用 saveAsTable


下一篇:08 学生课程分数的Spark SQL分析