pyspark---数据预处理经典demo

文章目录


Data Processing using Pyspark

1.先看下造的数据

pyspark---数据预处理经典demo

2.创建SparkSession及读取数据

# import SparkSession
from pyspark.sql import SparkSession

# create spar session object
spark = SparkSession.builder.appName('data_processing').getOrCreate()

# Load csv Dataset 
df = spark.read.csv('sample_data.csv',inferSchema=True,header=True)

3.dataframe基本信息的查看

获取列(字段)

# columns of dataframe
df.columns

pyspark---数据预处理经典demo

查看列(字段)个数

# check number of columns
len(df.columns)				# 5

查看记录数

# number of records in dataframe
df.count()					# 33

查看维度

# shape of dataset
print((df.count(),len(df.columns)))		# (33, 5)

打印字段树结构

# printSchema  打印字段树结构
df.printSchema()

pyspark---数据预处理经典demo

显示前n条记录

# fisrt few rows of dataframe
df.show(5)

pyspark---数据预处理经典demo

选择某几个字段

# select only 2 columns
df.select('age','mobile').show(5)

pyspark---数据预处理经典demo

查看详细信息

# info about dataframe
df.describe().show()

pyspark---数据预处理经典demo

4.基础操作

增加列

from pyspark.sql.types import StringType,DoubleType,IntegerType

# with column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

pyspark---数据预处理经典demo

修改某一列的类型

df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)

pyspark---数据预处理经典demo

# with column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

pyspark---数据预处理经典demo

filter过滤

# filter the records   过滤mobile是vivo的记录
df.filter(df['mobile']=='Vivo').show()

pyspark---数据预处理经典demo

过滤 + 选择

# filter the records 
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()

pyspark---数据预处理经典demo

条件

# filter the multiple conditions
df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).show()

pyspark---数据预处理经典demo

# filter the multiple conditions
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()

pyspark---数据预处理经典demo

某列的不重复值(特征的特征值)

# Distinct Values in a column
df.select('mobile').distinct().show()

pyspark---数据预处理经典demo

# distinct value count
df.select('mobile').distinct().count()		# 5

groupBy

df.groupBy('mobile').count().show(5,False)

pyspark---数据预处理经典demo

orderBy

# Value counts
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)

pyspark---数据预处理经典demo

mean

# Value counts
df.groupBy('mobile').mean().show(5,False)

pyspark---数据预处理经典demo

sum

df.groupBy('mobile').sum().show(5,False)

pyspark---数据预处理经典demo

max

# Value counts
df.groupBy('mobile').max().show(5,False)

pyspark---数据预处理经典demo

min

# Value counts
df.groupBy('mobile').min().show(5,False)

pyspark---数据预处理经典demo

agg

# Aggregation
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)

pyspark---数据预处理经典demo

5.UDF

from pyspark.sql.functions import udf

基础Python函数

# normal function 
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return 'High Price'
    elif brand =='MI':
        return 'Mid Price'
    else:
        return 'Low Price'
# create udf using python function
brand_udf = udf(price_range,StringType())

# apply udf on dataframe
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)

pyspark---数据预处理经典demo

lambda表达式

# using lambda function
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())

# apply udf on dataframe
df.withColumn("age_group", age_udf(df.age)).show(10,False)

pyspark---数据预处理经典demo

pandas udf

推荐使用

from pyspark.sql.functions import pandas_udf, PandasUDFType
# create python function
def remaining_yrs(age):
    yrs_left=100-age

    return yrs_left

# create udf using python function
length_udf = pandas_udf(remaining_yrs, IntegerType())

# apply pandas udf on dataframe
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)

pyspark---数据预处理经典demo

udf应用多列

# udf using two columns 
def prod(rating,exp):
    x=rating*exp
    return x

# create udf using python function
prod_udf = pandas_udf(prod, DoubleType())

# apply pandas udf on multiple columns of dataframe
df.withColumn("product", prod_udf(df['ratings'],df['experience'])).show(10,False)

pyspark---数据预处理经典demo

6.删除

去重 dropDuplicates

# duplicate values
df.count()					# 33

# drop duplicate values
df=df.dropDuplicates()

# validate new count
df.count()					# 26

删除某列

# drop column of dataframe
df_new=df.drop('mobile')

df_new.show(10)

pyspark---数据预处理经典demo

7.保存

# target directory 
write_uri='/home/jovyan/work/df_csv'

# save the dataframe as single csv 
df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
# target location
parquet_uri='/home/jovyan/work/df_parquet'

# save the data into parquet format 
df.write.format('parquet').save(parquet_uri)
上一篇:PC跳YD或互跳 JS


下一篇:Mobile-SegNet