1. 读取csv文件为DataFrame
通过Pyspark直接读取csv文件可以直接以DataFrame类型进行读取,通过利用schema模式来进行指定模式。
假设我有一个.csv文件,里面有四列数据,长这样,
该.csv文件没有header。分别为用户id,电影id, 电影评分,时间戳
通过导入Spark SQL中引入数据类型,
import pyspark.sql.types as typ
movie_labels=[('user_id' ,typ.IntegerType()),('product_id',typ.IntegerType()),('rating_id',typ.StringType()),('time_stamp',typ.StringType())]
movie_schema=typ.StructType([typ.StructField(e[0],e[1],False) for e in movie_labels])
movie_rdd =spark.read.csv("/data/lin/train_data/movie_test/u.csv",header=False,schema=movie_schema,sep=",")
movie_rdd.show(2)
StructField可以被分解为一下部分 StructField(name, dataType,nullable)
name:该字段名字 ; dataType:该字段的数据类型 ; nullable:指示此字段的值是否为空
结果如上图所示
2. 对DataFrame生成临时视图
movie_rdd.createOrReplaceTempView("movie_query")
生成临时试图,相当于就是一个表,可以进行写sql进行相关的一些操作,比如查询电影ID为242的数据
3. 获取DataFrame中某一列的值
movie_rdd.select('product_id').distinct().show()
比如这里查看product_id列有哪些取值
如果想要将结果赋值,那么就在后面写上collect()那么就会存在数组中
result=movie_rdd.select('product_id').distinct().collect()
那么常用的一些DataFrame操作就都可以使用啦, 就不一一列举啦!