SparkSQL /DataFrame /Spark RDD谁快?

如题所示,SparkSQL /DataFrame /Spark RDD谁快?

按照官方宣传以及大部分人的理解,SparkSQL和DataFrame虽然基于RDD,但是由于对RDD做了优化,所以性能会优于RDD。

之前一直也是这么理解和操作的,直到最近遇到了一个场景,打破了这种不太准确的认识。

某些场景下,RDD要比DataFrame快,性能有天壤之别。

 

需求如下

以下两份数据求交集,结果输出url。

数据一,json格式,地址我们用path_json表示,大小10T,每一行数据格式:{"id":"md5字符串", "url":"https://www.thesaurus.com/","title":"sysnonyms and antonyms",xxx},大概20来个字段;

数据二,csv格式,地址我们用path_csv表示,大小50G,每一行数据格式:name url,2个字段,用\t隔开。

 

拿到需求后,迅速瞟了一眼数据,爽快答应需求方分分钟搞定。

此时此刻,必须得祭出宇宙Top N的IDE,结合我30多年的人生阅历和代码经验,瞬间雷光电扇,惊雷骤起,一顿操作猛如虎,天空飘过以下几行代码:

(老铁们,请自行安装python,pyspark,pycharm)

SparkSQL /DataFrame /Spark RDD谁快?

方案一


from pyspark.sql import SparkSession
def join_it():
    path_json = 'hdfs://i/love/you/'  # 数据大小10T, 5万分区
    path_csv  = 'hdfs://you/love/me'  # 数据大小50G
    path_save = 'hdfs://we/are/together'

    df1 = spark.read.json(path_json).select('url')
    df2 = spark.read.option('sep', '\t').schema('name string, url string').csv(path_csv)
    df1.join(df2) \
        .select(df1.url)\
        .coalesce(10000) \
        .write \
        .mode('overwrite') \
        .option('sep', '\t') \
        .csv(path_save)


if __name__ == '__main__':
    spark = SparkSession.builder.appName('pyspark').getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    #
    join_it()
    #
    spark.stop()

spark-submit提交任务到spark集群,参数根据自己的实际情况自行修改。

spark-submit \
    --master yarn \
    --deploy-mode client \
    --name 'i-live-you' \
    --queue 'you-love-me' \
    --driver-cores 10 \
    --driver-memory 30g \
    --num-executors 3000 \
    --executor-memory 30g \
    --executor-cores 4 \
    --archives 'hdfs://your-python-path-on-hdfs#pkg'
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON='集群里面的python地址' \
    --conf spark.sql.shuffle.partitions=50000 \
    --conf spark.default.parallelism=50000 \
    --conf spark.task.maxFailures=20 \
    your-spark-script.py

如果需要在本机调试代码,spark的生成需要替换成如下,然后直接运行。调试通过后,仍然需要按照上述方式spark-submit提交任务到集群运行,由于数据量很大,需要在集群运行才能看出性能差异。

spark = SparkSession.builder.appName('pyspark').master('local[*]').getOrCreate()

又是一顿猛操作,提交任务后,嗡嗡叫的肚子提醒我,要去厕所烧一根香,拜拜佛,佛祖保佑无bug。

SparkSQL /DataFrame /Spark RDD谁快?

深圳的夏天,依旧不负众望的燥热。热情似火的太阳,伴着她最爱的紫外线和电磁波,循着外太空固定的轨道,迈着30万公里/秒的矫健步伐,到达这颗承载70亿人的蓝色星球,穿透层层蓝天白云,无私的照亮着广袤的深圳大地。

酷暑让人思想活跃,思绪万千。扯得有点远了,重来SparkSQL /DataFrame /Spark RDD谁快?不重来了,接着写bug。

 

作为一个谨记厕所文化的人,蹲坑5分钟,方便你我他;蹲坑半小时,痔疮等着你。我选择了后者。

半小时过去了,时间随着大A股的大跌,瞬间来到了下午的收盘时间。果不其然,又一个下跌如期而至,就在这一刻,体内的混浊之气伴着伴随着收跌的股市排出体外。收拾干净后,我带着满身厕所的芬芳,回到了座位上。

再次打开电脑屏幕,spark任务还在慢悠悠的读取json文件,半小时才读取300G左右,10T的json文件按照这个速度,全部读完的好几天。此方案不可用。

 

方案二

果断改成RDD,然后用intersection求交集,果然快很多,10T跟50G求交集,12000cores,5分钟出结果。Spark任务提交同方案一,不再赘述。此方案可行

屁颠屁颠的把结果交付给需求方,大佬甚是满意地流出了开心的泪水SparkSQL /DataFrame /Spark RDD谁快?

import json
from pyspark.sql import SparkSession

def join_it():
    path_json = 'hdfs://i/love/you/'  # 数据大小10T, 5万分区
    path_csv  = 'hdfs://you/love/me'  # 数据大小50G
    path_save = 'hdfs://we/are/together'
    #
    rdd1 = sc.textFile(path_json).map(lambda v: json.loads(v).get('url', '')).coalesce(50000)
    rdd2 = sc.textFile(path_csv).map(lambda v: v.split('\t')[1])
    rdd1.intersection(rdd2).coalesce(20000).saveAsTextFile(path_save)


if __name__ == '__main__':
    spark = SparkSession.builder.appName('pyspark').getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    #
    join_it()
    #
    spark.stop()

 

方案三

离下班实际还有30分钟,作为一个对技术有追求的资深码农SparkSQL /DataFrame /Spark RDD谁快?,尝试用SparkSQL实现该功能。祭出代码,以资各位看官共享,新能跟RDD不相上下。此方案也可取。

import json
from pyspark.sql import SparkSession

def join_it():
    path_json = 'hdfs://i/love/you/'  # 数据大小10T, 5万分区
    path_csv  = 'hdfs://you/love/me'  # 数据大小50G
    path_save = 'hdfs://we/are/together'
    #
    sc.textFile(path_json).map(lambda v: (json.loads(v).get('url', ''),)).toDF(['url']).createOrReplaceTempView('a')
    spark.read.option('sep', '\t').schema('name string, url string').csv(path_csv).createOrReplaceTempView('b')

    sql = '''
    SELECT
        a.url
    FROM
        a
    JOIN
        b
    ON
        a.url=b.url
    '''
    spark.sql(sql).coalesce(20000).write.mode('overwrite').option('sep', '\t').csv(path_save)


if __name__ == '__main__':
    spark = SparkSession.builder.appName('pyspark').getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
    #
    join_it()
    #
    spark.stop()

 

总结:

当遇到源数据是体量比较大的json或其他格式的时候,不要用spark.read的形式直接导入到DataFrame。

那要咋弄SparkSQL /DataFrame /Spark RDD谁快??可以先用RDD把源数据加载进来,然后再转化成DataFrame,后面用SparkSQL进行操作,如此可达到较好的性能效果。

 

上一篇:同样的SQL语句在SparkSQL中运行和在hive运行,结果不同


下一篇:SparkSQL统一数据的加载与落地