文章目录
Data Processing using Pyspark
1.先看下造的数据
2.创建SparkSession及读取数据
# import SparkSession
from pyspark.sql import SparkSession
# create spar session object
spark = SparkSession.builder.appName('data_processing').getOrCreate()
# Load csv Dataset
df = spark.read.csv('sample_data.csv',inferSchema=True,header=True)
3.dataframe基本信息的查看
获取列(字段)
# columns of dataframe
df.columns
查看列(字段)个数
# check number of columns
len(df.columns) # 5
查看记录数
# number of records in dataframe
df.count() # 33
查看维度
# shape of dataset
print((df.count(),len(df.columns))) # (33, 5)
打印字段树结构
# printSchema 打印字段树结构
df.printSchema()
显示前n条记录
# fisrt few rows of dataframe
df.show(5)
选择某几个字段
# select only 2 columns
df.select('age','mobile').show(5)
查看详细信息
# info about dataframe
df.describe().show()
4.基础操作
增加列
from pyspark.sql.types import StringType,DoubleType,IntegerType
# with column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)
修改某一列的类型
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)
# with column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)
filter过滤
# filter the records 过滤mobile是vivo的记录
df.filter(df['mobile']=='Vivo').show()
过滤 + 选择
# filter the records
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()
条件
# filter the multiple conditions
df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).show()
# filter the multiple conditions
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()
某列的不重复值(特征的特征值)
# Distinct Values in a column
df.select('mobile').distinct().show()
# distinct value count
df.select('mobile').distinct().count() # 5
groupBy
df.groupBy('mobile').count().show(5,False)
orderBy
# Value counts
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
mean
# Value counts
df.groupBy('mobile').mean().show(5,False)
sum
df.groupBy('mobile').sum().show(5,False)
max
# Value counts
df.groupBy('mobile').max().show(5,False)
min
# Value counts
df.groupBy('mobile').min().show(5,False)
agg
# Aggregation
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)
5.UDF
from pyspark.sql.functions import udf
基础Python函数
# normal function
def price_range(brand):
if brand in ['Samsung','Apple']:
return 'High Price'
elif brand =='MI':
return 'Mid Price'
else:
return 'Low Price'
# create udf using python function
brand_udf = udf(price_range,StringType())
# apply udf on dataframe
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)
lambda表达式
# using lambda function
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
# apply udf on dataframe
df.withColumn("age_group", age_udf(df.age)).show(10,False)
pandas udf
推荐使用
from pyspark.sql.functions import pandas_udf, PandasUDFType
# create python function
def remaining_yrs(age):
yrs_left=100-age
return yrs_left
# create udf using python function
length_udf = pandas_udf(remaining_yrs, IntegerType())
# apply pandas udf on dataframe
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)
udf应用多列
# udf using two columns
def prod(rating,exp):
x=rating*exp
return x
# create udf using python function
prod_udf = pandas_udf(prod, DoubleType())
# apply pandas udf on multiple columns of dataframe
df.withColumn("product", prod_udf(df['ratings'],df['experience'])).show(10,False)
6.删除
去重 dropDuplicates
# duplicate values
df.count() # 33
# drop duplicate values
df=df.dropDuplicates()
# validate new count
df.count() # 26
删除某列
# drop column of dataframe
df_new=df.drop('mobile')
df_new.show(10)
7.保存
# target directory
write_uri='/home/jovyan/work/df_csv'
# save the dataframe as single csv
df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
# target location
parquet_uri='/home/jovyan/work/df_parquet'
# save the data into parquet format
df.write.format('parquet').save(parquet_uri)