spark的学习-05

SparkSql

结构化数据与非结构化数据

结构化数据就类似于excel表中的数据(统计的都是结构化的数据)一般都使用sparkSql处理结构化的数据

结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc

结构化的表:数据库中表的数据:MySQL、Oracle、Hive

我们在sparkcore中导入数据使用的是textFile,而在sparksql中怎么导入数据呢

使用的是DataFrame进行数据的导入

将一些结构化的数据进行sql查询,需要将数据变为表,是表就必须有表结构,表结构就是Schema

一个经典的wordcount案例:

代码如下:(里面有sql和dsl两种写法)

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    spark = SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    print(spark)

    # 创建一个DataFrame对象,读取数据
    df = spark.read.text("../../datas/wordcount/data.txt")
    # 创建一个临时表,表名为 wordcount
    df.createOrReplaceTempView("wordcount")

    # 第一种写法,使用sparksql
    spark.sql("""
        with t as ( 
            select word from wordcount lateral view explode(split(value," ")) wordtemp as word
        ),t2 as (
            select trim(word) word from t where trim(word) != ""
        )select word,count(1) countNum from t2 group by word order by countNum desc
        
    """).show()

    # 第二种写法,使用 dsl
    df.select(F.explode(F.split("value"," ")).alias("word")) \
     .where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()
    #这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")

    # 还可以这样写
    df.select(F.explode(F.split("value"," ")).alias("word")) \
    .where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()

    spark.stop()

以上的代码还可以使用with进行优化

补充:

with的作用: 我们在创建对象的时候,经常需要关闭(close、stop) 如果忘记关闭,太多对象的话就会影响性能,使用with自动帮我们关闭

什么时候可以使用with呢

源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化

优化过后的代码: (此时就不需要在手动stop关闭了)

import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 创建spark对象
    with SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate() as spark:


        # 创建一个DataFrame对象,读取数据
        df = spark.read.text("../../datas/wordcount/data.txt")
        # 创建一个临时表,表名为 wordcount
        df.createOrReplaceTempView("wordcount")

        # 第一种写法,使用sparksql
        spark.sql("""
            with t as ( 
                select word from wordcount lateral view explode(split(value," ")) wordtemp as word
            ),t2 as (
                select trim(word) word from t where trim(word) != ""
            )select word,count(1) countNum from t2 group by word order by countNum desc
            
        """).show()

        # 第二种写法,使用 dsl
        df.select(F.explode(F.split("value"," ")).alias("word")) \
         .where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()
        #这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")

        # 还可以这样写
        df.select(F.explode(F.split("value"," ")).alias("word")) \
        .where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()

一个案例:

需求:统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数。

  • 电影评分数据:datas/movie/ratings.dat【用户id、电影id、评分、评分时间】

数据如下:

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
  • 电影信息数据:datas/movie/movies.dat【电影id、电影名称、分类】

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action

首先,给定的数据不是我们所经常使用的格式化数据,所以需要先将数据进行格式化

可以使用RDD的算子将数据改为我们想要的格式化数据

也可以直接利用sql,将非格式化的数据修改为我们需要的格式的数据

写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例

使用RDD+SparkSQL

代码如下:

import os
import re

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'


    # 创建spark对象
    with SparkSession.builder.master("local[2]").appName("MovieTop10").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate() as spark:
        print(spark)

        rating_df = spark.sparkContext.textFile("../../datas/movie/ratings.dat").map(lambda line:re.split("::",line)) \
            .filter(lambda item:len(item) == 4).map(lambda item:(item[0],item[1],item[2],item[3])) \
            .toDF(["user_id","movie_id","score","score_time"]).createOrReplaceTempView("rating")

        # spark.sql("""
        #     select * from rating
        # """).show()


        movie_df = spark.sparkContext.textFile("../../datas/movie/movies.dat") \
        .map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2])) \
        .toDF(["movie_id", "movie_name", "movie_categry"]).createOrReplaceTempView("movie")

        # spark.sql("""
        #     select * from movie
        # """).show(truncate=False)

        #统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
        spark.sql("""
                  select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_id
                   group by m.movie_name having countNum >2000 order by avgRate desc limit 10
               """).show(truncate=False)


        # 保留两位小数后,结果可能有重复的,想要获取重复排名也只算一位的可以使用排名函数,dense_rank()
        spark.sql("""
            with t as (
                select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_id
                   group by m.movie_name having countNum >2000
            ),t2 as (
                select *,dense_rank() over(order by avgRate desc) paiming from t
             ) select * from t2 where paiming <= 10
        """).show()
复习 排名函数:
1、row_number()

row_number从1开始,按照顺序,生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列

效果如下:
98                1
97                2
97                3
96                4
95                5
95                6

没有并列名次情况,顺序递增
2、rank()

生成数据项在分组中的排名,排名相等会在名次中留下空位

效果如下:
98                1
97                2
97                2
96                4
95                5
95                5
94                7
有并列名次情况,顺序跳跃递增
3、dense_rank()

生成数据项在分组中的排名,排名相等会在名次中不会留下空位

效果如下:
98                1
97                2
97                2
96                3
95                4
95                4
94                5
有并列名次情况,顺序递增
只使用 SparkSQL:

以上是RDD + sparkSQL的写法, 还可以通过 sparkSQL的写法硬写出来

通过split()方法,根据非格式化数据的分隔符,将数据切成我们需要的DataFrame类型的数据

df1 = spark.read.text("../../datas/movie/movies.dat").createOrReplaceTempView("movie1")
df2 = spark.read.text("../../datas/movie/ratings.dat").createOrReplaceTempView("rating1")

#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
spark.sql("""
    with m1 as (
        select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1
    ),r1 as ( 
        select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1
     )select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_id
     group by m1.movie_name having countNum >2000 order by avgRote desc limit 10
""").show(truncate=False)

# 同样也可以写成排名函数
spark.sql("""
    with m1 as (
        select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1
    ),r1 as ( 
        select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1
     ),t as ( 
        select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_id
     group by m1.movie_name having countNum >2000
      ),t2 as ( 
            select *,dense_rank() over(order by avgRote desc) paiming from t
       )select * from t2 where paiming <= 10
""").show(truncate=False)
上一篇:vue2 自动化部署 shell 脚本


下一篇:打造自己的RAG解析大模型:Labelme版面标注并顺利完成训练