为什么用
python中pandas是数据分析的利器,具有并行的特兹那个,而且函数和数据计算的方法非常方便,是数据分析中的瑞士军刀。但是受限于单个机器性能和配置的限制,当大规模数据,比如100G-10TB规模的数据时,pandas就显得局限了,就像瑞士军刀杀牛,难以下手。这时就需要基于分布式计算的大数据工具spark,是基于分布式计算,可以基于hadoop和hive,进行分布式的数据计算,同时spark具有python API,可以通过类似python的语法,无门槛的过渡。
怎么用
pyspark支持RDD和DataFrame的数据类型,但是RDD在python中相比于scala和R语言,是非常慢的,而DataFrame使性能在各种语言中都保持稳定。所以我们可以试用pyspark的DataFrame对大规模数据进行数据清理,然后转为pandas.dataframe,进行数据可视化。
初始化类 class pyspark.sql.SparkSession 类
spark = SparkSession.builder \
... .master("local") \
... .appName("Word Count") \
... .config("spark.some.config.option", "some-value") \
... .getOrCreate
Builder for SparkSession 这个就是生成一个 sparksession 实例。他下面有一些支持的函数
master: 设置 spark master 的 url 。由于我们集群使用的是 spark on yarn 的模式,所以可以用选择不设置这个参数。
appName: 在程序运行的时候看到的应用名称。
config: 其实其他参数也是调用 .config 设置相应的 config 参数,例如 .master 就是调用了 .config(“spark.master”, master)。
enableHiveSupport: 启动对 hive 的支持。例如支持 hivesql 以及 hive udf 等。
getOrCreate: 得到一个现成的 SparkSession ,如果没有就生成一个。
- SparkSession.catalog:
提供一个接口来操作 create drop alter query 库或者表,比如:
catalog.listTables().select($"name").show(2,false)
- SparkSession.createDataFrame:
可以获得从 rdd python list 和 pandas df 创建 df 的能力。下面贴一下官方的例子:
l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> spark.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]
>>> d = [{'name': 'Alice', 'age': 1}]
>>> spark.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]
>>> rdd = sc.parallelize(l)
>>> spark.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df = spark.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
- SparkSession.sql:
使用sql方法返回的是df,例如:
>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
- SparkSession.table
这个可以返回指定表的 df ,将表复制成df
函数说明
- 查询
1,.show(n),方法默认显示前10行
2,spark.sql("select * from ").collect()
利用DataFrame API查询
- swimmers.count() 返回dataframe 的行数
- swimmers.select().filter().show() 返回筛选后数据
利用sql查询
- spark.sql(“select count(1) from swimmers”).show()
sql语句和sql语言一致,也可以试用where 筛选
DataFrame函数说明
-
agecol=people.age 选择其中的列
-
agg(*exps)
应用到整个没有分组的dataframe上,全称是df.groupby().agg() -
alias(alias)
返回一个设置了别名alias的dataframe,具有重命名的功能
df_as1 = df.alias("df_as1")
-
approxQuantile(col,pro probabilities, relativeError)
返回一个列的中值列表 -
cache()
返回dataframed的存贮水平 -
checkpoint([eager])
返回这个数据集的检查点版本,检查点可以用来截断这个DataFrame的逻辑计划,这在计划可能呈指数增长的迭代算法中特别有用。它将保存到SparkContext.setCheckpointDir()设置的检查点目录中的文件中。 -
coalesce(numPartitions)
-
colRegex(colName)
-
collect()
返回所有的数据记录的list -
corr(col1, col2[, method])
-
count()
返回dataframe的行数 -
cov(col1, col2)
计算给定列的样本协方差 -
createGlobalTempView(name)
通过 df 创建一个全局的临时表。他的生命周期跟 spark 应用的生命周期相关联。如果视图中已经存在这个名字了会抛出 TempTableAlreadyExistsException 的错误。 -
createOrReplaceGlobalTempView(name)
这个对比 createGlobalTempView 不同的点是不会抛出 TempTableAlreadyExistsException 的错误,会直接替换。 -
createOrReplaceTempView(name)
使用现成的df创建临时视图,可以使用sql语句获取数据 -
createTempView(name)
-
crossJoin(other)
和另外一个 df 取笛卡尔积例如
>>> df.select("age", "name").collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df2.select("name", "height").collect()
[Row(name=u'Tom', height=80), Row(name=u'Bob', height=85)]
>>> df.crossJoin(df2.select("height")).select("age", "name", "height").collect()
[Row(age=2, name=u'Alice', height=80), Row(age=2, name=u'Alice', height=85),
Row(age=5, name=u'Bob', height=80), Row(age=5, name=u'Bob', height=85)]
- crosstab(col1, col2)
- cube(*cols)
- describe(*cols)
计算数字和字符串列的基本统计信息
>>> df.describe(['age']).show()
+-------+------------------+
|summary| age|
+-------+------------------+
| count| 2|
| mean| 3.5|
| stddev|2.1213203435596424|
| min| 2|
| max| 5|
+-------+------------------+
>>> df.describe().show()
+-------+------------------+-----+
|summary| age| name|
+-------+------------------+-----+
| count| 2| 2|
| mean| 3.5| null|
| stddev|2.1213203435596424| null|
| min| 2|Alice|
| max| 5| Bob|
+-------+------------------+-----+
- distinct()
返回一个不包含重复值的新的dataframe - drop(*cols)
返回删除指定列的dataframe - dropDuplicates([subset])
返回一个新的 df ,这个 df 里面不再有重复的记录。可选参数可以让我们选择关心的字段进行去重。
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
+---+------+-----+
-
drop_duplicates([subset])
-
dropna([how, thresh, subset])
返回了删除含有空值的列的dataframe -
exceptAll(other)
-
explain([extended, mode])
-
fillna(value[, subset])
填充缺失值,subset指定需要填充的列
>>> df4.na.fill(50).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
| 5| 50| Bob|
| 50| 50| Tom|
| 50| 50| null|
+---+------+-----+
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+---+------+-------+
|age|height| name|
+---+------+-------+
| 10| 80| Alice|
| 5| null| Bob|
| 50| null| Tom|
| 50| null|unknown|
+---+------+-------+
- filter(condition)
给定特定条件过滤,和where是同名函数
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]
>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
-
first()
返回第一条 df -
foreach(f)
定义一个函数,会让每个 df 都执行该函数
>>> def f(person):
... print(person.name)
>>> df.foreach(f)
- foreachPartition(f)
定义一个函数,会让每个 partitions 都执行这个函数
>>> def f(people):
... for person in people:
... print(person.name)
>>> df.foreachPartition(f)
-
freqItems(cols[, support])
-
groupBy(*cols)=group_by
可以使用 agg 方法对其进行各种各样的聚合, spark sql 专门有个类为其提供了非常多的处理函数。See GroupedData for all the available aggregate functions.
>>> df.groupBy().avg().collect()
[Row(avg(age)=3.5)]
>>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
[Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(df.name).avg().collect())
[Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(['name', df.age]).count().collect())
[Row(name=u'Alice', age=2, count=1), Row(name=u'Bob', age=5, count=1)]
-
head([n]) 返回前n行
没有指定n时,默认是一行 -
hint(name, *parameters)
-
inputFiles()
-
intersect(other)
-
intersectAll(other)
-
isLocal()
-
join(other, on, how)
使用给定的条件和其他 df 进行 join。
other:另外一个df.
on:条件,join的位置 列名
how:默认是inner -
limit(num)
限制拿多少条 -
localCheckpoint([eager])
-
mapInPandas(func, schema)
-
orderBy(*cols, **kwargs)
返回一个被指定 col 排序好的 df
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
- persist([storageLevel])
- printSchema()
打印出该 df 的 schema - randomSplit(weights[, seed])
- registerTempTable(name)
将 df 注册成一个临时生命周期是 SparkSession 的周期,这个跟上面 createOrReplaceTempView 互为同名函数,就是调用该函数生成的。 - repartition(numPartitions, *cols)
返回一个新的 df,这个新的 df 被给定的 numPartitions 数量进行 hash 重分区。numPartitions可以是指定分区或列的目标数量的int。如果它是一个列,它将被用作第一个分区列。如果没有指定,则使用默认分区数。 - repartitionByRange(numPartitions, *cols)
- replace(to_replace, value, subset)
返回一个 df 用参数位置2的值替换掉参数位置是1的值。DataFrame.replace() and DataFrameNaFunctions.replace()互为同名函数
>>> df4.na.replace(10, 20).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 20| 80|Alice|
| 5| null| Bob|
|null| null| Tom|
|null| null| null|
+----+------+-----+
>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
+----+------+----+
| 10| 80| A|
| 5| null| B|
|null| null| Tom|
|null| null|null|
+----+------+----+
- rollup(*cols)
- sameSemantics(other)
- sample([withReplacement, fraction, seed])
- sampleBy(col, fractions[, seed])
- select(*cols)
返回 select 能找到的数据 - selectExpr(*expr)
- semanticHash()
- show([n, truncate, vertical])
打印前n行数据带控制台 - sort(*cols, **kwargs)
根据给定的 cols 进行排序之后返回新的 df
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
-
sortWithinPartitions(*cols, **kwargs)
-
subtract(other)
取目标 df ohter 和 df 的补集。 -
summary(*statistics)
-
tail(num)
-
take(num)
返回df前num条数据 -
toDF(*cols)
返回新的df,新的列名对应list顺序 -
toJSON([use_unicode])
将dataframe 数据格式转成rdd格式 -
toLocalIterator([prefetchPartitions])
返回一个迭代器,包含了dataframe的所有列 -
toPandas()
将数据转换为pandas.dataframe数据类型 -
transform(func)
返回一个新的dataframe -
union(other)
交集 -
unionAll(other)
并集 -
unionByName(other[, allowMissingColumns])
返回包含这个和其他dataframe中独一无二的列的,交集dataframe -
unpersist([blocking])
-
where(condition)
功能与filter一样 -
withColumn(colName, col)
返回一个通过添加或替换列名的新的dataframe -
withColumnRenamed(existing, new)
返回一个重命名列名的dataframe -
withWatermark(eventTime, delayThreshold)
-
writeTo(table)
DataFrame属性
- columns
以列表形式返回所有列名。 - dtypes
以列表形式返回所有列名及其数据类型。 - isStreaming
- na
- rdd
- schema
返回dataframe的架构 - stat
- storageLevel
- write
非流接口向外部存储写入数据 - writeStream
流接口向外部存储写入数据
参考文章:
https://www.cnblogs.com/piperck/p/10446720.html
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html