#!/bin/bash #jdk export JAVA_HOME=/usr/java/jdk1.8.0_162 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin #hadoop export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24 export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native #spark #export SPARK_HOME=/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0 #export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin realtime_queue=root my_job_name="userhierarchy" main_class="com.df.App" /opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0/bin/spark-submit --master local[2] \ --name ${my_job_name} \ --class ${main_class} \ --driver-memory 2g \ --executor-memory 2g \ --executor-cores 8 \ --queue ${realtime_queue} \ /opt/cdh/submit/userhierarchy/user_hierarchy-1.0-SNAPSHOT.jar #--driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8787" \
1.首先将数据量减少到5000 可以正常执行,测试1w条 又不行。 考虑到会不会因为产生笛卡尔积。 因为5000条产生笛卡尔积是5000*5000 ,而3w * 3w 的数据量也还是不小。 故对所有使用join的SQL都进行排查, 发现没有笛卡尔积。 然后尝试使用IN 来代替 left join 。 还是不行。 然后通过使用where代替having。 因为 having在SQL中的执行在最后,需要对表进行分组 排序完之后在做having的条件筛选,而 where是先进行条件筛选之后在对剩余数据进行处理。效率会比having要高。 但是程序还是卡在上图位置不动。 2.转变思路对 参数进行优化,查阅资料发现一个参数 spark.locality.wait 数据本地化等待时间 为什么会有这个参数呢? spark在driver上 ,对application的每个state的task分配之前,会先计算出每个task要计算的是哪个分片数据(RDD上的某个partition),spark分配的task的算法,优先希望每个task签好分配到它所要计算的数据的节点上,这样就尽可能的避免了网络间数据传输。 但实际上,有时候 ,task并没有分配到它所要计算的数据的节点上,因为有可能那个节点的计算资源和计算能力满了,因为task处理数据是需要计算资源的,所以通常来说spark会等待一段时间,看是否能将task分配到它要处理数据所在节点上,这个等待时长默为3s(3s不是绝对的,针对不同的本地化级别可以设置不同等待时长)。如果超过等待时长,无法计算等待,就会选择一个性能比较差的本地化级别,比如:task分配到距离它所要处理数据节点比较近的一个节点上,然后传输数据进行计算。 而对于我们来说最好是,task正好分配到它要处理数据所在节点上,这样直接从本地executor对应的blockManager中获取数据,纯内存传出数据,或带有部分磁盘IO。 本地化级别: 本地化就是指task被分配要处理部分数据,task和它要处理的数据可能会在不同的节点位置,根据这种位置关系又5种不同的本地化级别: 1.PROCESS_LOCAL:进程本地化,计算数据的task由某个executor执行,数据也就在这个executor对应的BlockManager。这种本地化级别 性能最好 2.NODE_LOCAL:节点本地化。第一种情况,数据作为HDFS block数据块就在节点上, 而task节点是在某个executor上运行;第二种情况,task和它要处理的数据,在同一节点的不同executor上,数据需要在进程之间传输 3.NO_PREF: 对于task来说,数据在哪里获取都一样,无好坏之分 4.RACK_LOCAL:机架本地化,task和它要处理的数据在同一机架的不同节点上, 数据需要通过网络在节点之间传输 5.ANY:task和它要处理的数据可能在集群的任何地方,而且不在同一机架上(RACK),数据要跨机架传输,性能最差。 何时调节该参数呢? 在本地模式下跑程序观察spark作业的日志,查看starting task........ PROCESS_LOCAL NODE_LOCAL 观察大多数task的数据本地化级别,如果大多数是NODE_LOCAL / ANY 那么可以调节,观察时间是否有缩短,反复几次 寻找最优的时间。 添加配置后的脚本
#!/bin/bash #jdk export JAVA_HOME=/usr/java/jdk1.8.0_162 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin #hadoop export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24 export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native #spark #export SPARK_HOME=/opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0 #export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin realtime_queue=root my_job_name="userhierarchy" main_class="com.df.App" /opt/cdh/spark-2.1.0-bin-2.6.0-cdh5.14.0/bin/spark-submit --master local[3] \ --name ${my_job_name} \ --class ${main_class} \ --driver-memory 3g \ --executor-memory 3g \ --executor-cores 5 \ --conf spark.executor.memoryOverhead=2048M \ --conf spark.locality.wait=10 \ --queue ${realtime_queue} \ /opt/cdh/submit/userhierarchy/user_hierarchy-1.0-SNAPSHOT.jar order_info_test