一、概述
组件
运行机制
转 SparkSQL – 从0到1认识Catalyst https://blog.csdn.net/qq_36421826/article/details/81988157
深入研究Spark SQL的Catalyst优化器(原创翻译)
更高效
查询优化
优化:把filter提前
数据源优化
编译优化 Code generation
DataSet和DataFrame
数据源
Parquet文件
Json文件
读取Hive中文件
外部数据源spark.read.format
二、程序设计
常规流程
API:SQL与DataFrame DSL
统计分析内容大小-全部内容大小,日志条数,最小内容大小,最大内容大小
package org.sparkcourse.log import org.apache.spark.sql.{Row, SparkSession} object LogAnalyzerSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Log Analyzer") .master("local") .getOrCreate() import spark.implicits._ val accessLogs = spark .read .textFile("data/weblog/apache.access.log") .map(ApacheAccessLog.parseLogLine).toDF() accessLogs.createOrReplaceTempView("logs") // 统计分析内容大小-全部内容大小,日志条数,最小内容大小,最大内容大小 val contentSizeStats: Row = spark.sql("SELECT SUM(contentSize), COUNT(*), MIN(contentSize), MAX(contentSize) FROM logs").first() val sum = contentSizeStats.getLong(0) val count = contentSizeStats.getLong(1) val min = contentSizeStats.getLong(2) val max = contentSizeStats.getLong(3) println("sum %s, count %s, min %s, max %s".format(sum, count, min, max)) println("avg %s", sum / count) spark.close() } }
ApacheAccessLog
package org.sparkcourse.log import sun.security.x509.IPAddressName case class ApacheAccessLog(ipAddress: String, clientIdentd: String, userId: String, dateTime: String, method: String, endpoint: String, protocol: String, responseCode: Int, contentSize: Long){ } object ApacheAccessLog { // 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846 val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s+\-\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r def parseLogLine(log: String): ApacheAccessLog = { log match { case PATTERN(ipAddress, clientIdentd, userId, dateTime, method, endpoint, protocol, responseCode, contentSize) => ApacheAccessLog(ipAddress, clientIdentd, userId, dateTime, method, endpoint, protocol, responseCode.toInt, contentSize.toLong) case _ => throw new RuntimeException(s"""Cannot parse log line: $log""") } }
统计每种返回码的数量
package org.sparkcourse.log import org.apache.spark.sql.{Row, SparkSession} object LogAnalyzerSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Log Analyzer") .master("local") .getOrCreate() import spark.implicits._ val accessLogs = spark .read .textFile("data/weblog/apache.access.log") .map(ApacheAccessLog.parseLogLine).toDF() accessLogs.createOrReplaceTempView("logs") // 统计每种返回码的数量. val responseCodeToCount = spark.sql("SELECT responseCode, COUNT(*) FROM logs GROUP BY responseCode LIMIT 100") .map(row => (row.getInt(0), row.getLong(1))) .collect() responseCodeToCount.foreach(print(_)) } }
统计哪个IP地址访问服务器超过10次
package org.sparkcourse.log import org.apache.spark.sql.{Row, SparkSession} object LogAnalyzerSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Log Analyzer") .master("local") .getOrCreate() import spark.implicits._ val accessLogs = spark .read .textFile("data/weblog/apache.access.log") .map(ApacheAccessLog.parseLogLine).toDF() accessLogs.createOrReplaceTempView("logs") // 统计哪个IP地址访问服务器超过10次 val ipAddresses = spark.sql("SELECT ipAddress, COUNT(*) AS total FROM logs GROUP BY ipAddress HAVING total > 10 LIMIT 100") .map(row => row.getString(0)) .collect() ipAddresses.foreach(println(_)) } }
查询访问量最大的访问目的地址
package org.sparkcourse.log import org.apache.spark.sql.{Row, SparkSession} object LogAnalyzerSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Log Analyzer") .master("local") .getOrCreate() import spark.implicits._ val accessLogs = spark .read .textFile("data/weblog/apache.access.log") .map(ApacheAccessLog.parseLogLine).toDF() accessLogs.createOrReplaceTempView("logs") // 查询访问量最大的访问目的地址 val topEndpoints = spark.sql("SELECT endpoint, COUNT(*) AS total FROM logs GROUP BY endpoint ORDER BY total DESC LIMIT 10") .map(row => (row.getString(0), row.getLong(1))) .collect() topEndpoints.foreach(println(_)) } }