meituan交互式系统浅析(3) sparkSQL数据倾斜解决

对于在开发过程中可能出现的数据倾斜问题,可提供一种利用双重group by的方法来解决。

分析:

 可以使用类似于SparkCore中解决数据倾斜,提高的两阶段聚合(局部+全局)
 局部——随机打散+前缀,通过groupBy完成局部统计
  全局——去掉前缀,通过groupBy完成全局统计
object _05SparkSQLOptimizationOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val conf = new SparkConf().setMaster("local[2]").setAppName(s"${_05SparkSQLOptimizationOps.getClass.getSimpleName}")

        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        //注册自定义的函数
        sqlContext.udf.register[String, String, Int]("addRandomPrefix", (field, num) => addRandomPrefix(field, num))
        sqlContext.udf.register[String, String]("removePrefix", field => removePrefix(field))

        val df = sqlContext.read.text("E:/data/hello.log").toDF("line")
//        df.show()
        //sql的方式
        df.registerTempTable("test")
//        groupByOps1(sqlContext)
        //1、添加前缀
                sqlContext.sql("select " +
                                    "addRandomPrefix(w.word, 2) as p_word " +
                              "from (" +
                                "select " +
                                    "explode(split(line, ' ')) as word " +
                                "from test" +
                              ") w").show()
        //2、局部统计
                sqlContext.sql("select " +
                                  "p.p_word," +
                                  "count(p.p_word) as p_count " +
                               "from (" +
                                    "select " +
                                        "addRandomPrefix(w.word, 2) as p_word " +
                                    "from (" +
                                        "select " +
                                            "explode(split(line, ' ')) as word " +
                                        "from test" +
                                    ") w" +
                               ") p " +
                              "group by p.p_word").show()
        //3、干掉前缀
                sqlContext.sql("select " +
                                  "removePrefix(p.p_word) as r_word," +
                                  "count(p.p_word) as r_count " +
                               "from (" +
                                    "select " +
                                        "addRandomPrefix(w.word, 2) as p_word " +
                                    "from (" +
                                        "select " +
                                            "explode(split(line, ' ')) as word " +
                                        "from test" +
                                    ") w" +
                               ") p " +
                              "group by p.p_word").show()
        //4、全局统计

        sqlContext.sql("select " +
                            "r.r_word as field, " +
                            "sum(r.r_count) as sum " +
                       "from (" +
                            "select " +
                              "removePrefix(p.p_word) as r_word," +
                              "count(p.p_word) as r_count " +
                            "from (" +
                                "select " +
                                    "addRandomPrefix(w.word, 2) as p_word " +
                                "from (" +
                                    "select " +
                                        "explode(split(line, ' ')) as word " +
                                    "from test" +
                                ") w" +
                            ") p " +
                           "group by p.p_word" +
                       ") r " +
                       "group by r.r_word").show()
        sc.stop()
    }

    private def groupByOps1(sqlContext: SQLContext) = {
        //拆分
        sqlContext.sql("select explode(split(line, ' ')) as word from test")
            .registerTempTable("word_tmp")
        //添加前缀
        sqlContext.sql("select addRandomPrefix(word, 2) as p_word from word_tmp")
            .registerTempTable("prefix_word_tmp")
        //局部聚合
        sqlContext.sql("select p_word, count(p_word) as p_count from prefix_word_tmp group by p_word")
            .registerTempTable("prefix_count_word_tmp")
        //去掉前缀
        sqlContext.sql("select removePrefix(p_word) as r_word, p_count as r_count from prefix_count_word_tmp")
            .registerTempTable("r_prefix_count_word_tmp")
        //全局聚合
        sqlContext.sql("select r_word, sum(r_count) r_sum from r_prefix_count_word_tmp group by r_word").show()
    }

    /**
      * 添加随机前缀
      *
      * @param field
      * @param num  [0, num)
      * @return  num_field
      */
    def addRandomPrefix(field:String, num:Int):String = {
        val random = new Random()
        val prefix = random.nextInt(num)
        prefix + "_" + field
    }

    /**
      * 去掉随机前缀
      * @param field
      * @return
      */
    def removePrefix(field:String):String = field.split("_")(1)
}

上一篇:上海上传数据重复-sftp端口关闭


下一篇:基于Walle的多渠道快速打包自动脚本