对于在开发过程中可能出现的数据倾斜问题,可提供一种利用双重group by的方法来解决。
分析:
可以使用类似于SparkCore中解决数据倾斜,提高的两阶段聚合(局部+全局)
局部——随机打散+前缀,通过groupBy完成局部统计
全局——去掉前缀,通过groupBy完成全局统计
object _05SparkSQLOptimizationOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
val conf = new SparkConf().setMaster("local[2]").setAppName(s"${_05SparkSQLOptimizationOps.getClass.getSimpleName}")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//注册自定义的函数
sqlContext.udf.register[String, String, Int]("addRandomPrefix", (field, num) => addRandomPrefix(field, num))
sqlContext.udf.register[String, String]("removePrefix", field => removePrefix(field))
val df = sqlContext.read.text("E:/data/hello.log").toDF("line")
// df.show()
//sql的方式
df.registerTempTable("test")
// groupByOps1(sqlContext)
//1、添加前缀
sqlContext.sql("select " +
"addRandomPrefix(w.word, 2) as p_word " +
"from (" +
"select " +
"explode(split(line, ' ')) as word " +
"from test" +
") w").show()
//2、局部统计
sqlContext.sql("select " +
"p.p_word," +
"count(p.p_word) as p_count " +
"from (" +
"select " +
"addRandomPrefix(w.word, 2) as p_word " +
"from (" +
"select " +
"explode(split(line, ' ')) as word " +
"from test" +
") w" +
") p " +
"group by p.p_word").show()
//3、干掉前缀
sqlContext.sql("select " +
"removePrefix(p.p_word) as r_word," +
"count(p.p_word) as r_count " +
"from (" +
"select " +
"addRandomPrefix(w.word, 2) as p_word " +
"from (" +
"select " +
"explode(split(line, ' ')) as word " +
"from test" +
") w" +
") p " +
"group by p.p_word").show()
//4、全局统计
sqlContext.sql("select " +
"r.r_word as field, " +
"sum(r.r_count) as sum " +
"from (" +
"select " +
"removePrefix(p.p_word) as r_word," +
"count(p.p_word) as r_count " +
"from (" +
"select " +
"addRandomPrefix(w.word, 2) as p_word " +
"from (" +
"select " +
"explode(split(line, ' ')) as word " +
"from test" +
") w" +
") p " +
"group by p.p_word" +
") r " +
"group by r.r_word").show()
sc.stop()
}
private def groupByOps1(sqlContext: SQLContext) = {
//拆分
sqlContext.sql("select explode(split(line, ' ')) as word from test")
.registerTempTable("word_tmp")
//添加前缀
sqlContext.sql("select addRandomPrefix(word, 2) as p_word from word_tmp")
.registerTempTable("prefix_word_tmp")
//局部聚合
sqlContext.sql("select p_word, count(p_word) as p_count from prefix_word_tmp group by p_word")
.registerTempTable("prefix_count_word_tmp")
//去掉前缀
sqlContext.sql("select removePrefix(p_word) as r_word, p_count as r_count from prefix_count_word_tmp")
.registerTempTable("r_prefix_count_word_tmp")
//全局聚合
sqlContext.sql("select r_word, sum(r_count) r_sum from r_prefix_count_word_tmp group by r_word").show()
}
/**
* 添加随机前缀
*
* @param field
* @param num [0, num)
* @return num_field
*/
def addRandomPrefix(field:String, num:Int):String = {
val random = new Random()
val prefix = random.nextInt(num)
prefix + "_" + field
}
/**
* 去掉随机前缀
* @param field
* @return
*/
def removePrefix(field:String):String = field.split("_")(1)
}