Pyspark中的DataFrame操作汇总

1. 读取csv文件为DataFrame

通过Pyspark直接读取csv文件可以直接以DataFrame类型进行读取,通过利用schema模式来进行指定模式。

假设我有一个.csv文件,里面有四列数据,长这样,

Pyspark中的DataFrame操作汇总

该.csv文件没有header。分别为用户id,电影id, 电影评分,时间戳 

通过导入Spark SQL中引入数据类型,

import pyspark.sql.types as typ
movie_labels=[('user_id' ,typ.IntegerType()),('product_id',typ.IntegerType()),('rating_id',typ.StringType()),('time_stamp',typ.StringType())]
movie_schema=typ.StructType([typ.StructField(e[0],e[1],False) for e in movie_labels])
movie_rdd =spark.read.csv("/data/lin/train_data/movie_test/u.csv",header=False,schema=movie_schema,sep=",")
movie_rdd.show(2)

StructField可以被分解为一下部分   StructField(name, dataType,nullable) 

name:该字段名字  ; dataType:该字段的数据类型  ; nullable:指示此字段的值是否为空 

Pyspark中的DataFrame操作汇总

结果如上图所示 

2. 对DataFrame生成临时视图 

movie_rdd.createOrReplaceTempView("movie_query")

生成临时试图,相当于就是一个表,可以进行写sql进行相关的一些操作,比如查询电影ID为242的数据

Pyspark中的DataFrame操作汇总

3. 获取DataFrame中某一列的值

movie_rdd.select('product_id').distinct().show()

比如这里查看product_id列有哪些取值

Pyspark中的DataFrame操作汇总

如果想要将结果赋值,那么就在后面写上collect()那么就会存在数组中 

result=movie_rdd.select('product_id').distinct().collect()

Pyspark中的DataFrame操作汇总

那么常用的一些DataFrame操作就都可以使用啦, 就不一一列举啦!

上一篇:8.最佳电影聚类分析


下一篇:day02 request请求库爬取豆瓣电影信息,selenium