本文试图实现的需求场景为:以学习Spark知识点为目的,编写Scala利用Spark解析800M的tomcat日志文件,打印一段时间内ERROR级别记录的前10行,统计每分钟的日志记录数,并将统计结果存入mysql数据库中。之前曾用JAVA写过一次同样的处理逻辑,但在学习了Scala之后,真的感觉在计算方面Scala要比JAVA方便的多。没有学习Scala语言的同学速度速度了啊……
技术要点
- 将日志文件写入HDFS中,相对路径PATH为“nova.log”
- 注意JAVA堆栈异常日志的处理
- 将解析后的异常日志全部存到SparkSQL中或Hive数据仓库中
- 通过编写SQL查询一段时间内ERROR级别记录的前10行
- 统计每分钟的日志记录数,并将统计结果存入mysql数据库中,便于上层应用直接使用计算结果
解析前后对比
解析前:
解析后:
解析代码
LoggerApp.scala:
import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.spark.sql.Row import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.TimestampType /** * 日志解析 */ object LoggerApp { def main(args: Array[String]): Unit = { println("<!--开始解析-->") val reg = "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}) (\\[.*\\]) (.*) (.*) - ([\\s\\S]*)$" val path = "nova.log" val sc = new SparkContext(new SparkConf().setAppName("日志解析")) val textRDD = sc.textFile(path) /** * 处理一条日志包括多行的情况 */ var key = "" val formatRDD = textRDD.map { x => if (x.matches(reg)) { key = x Pair.apply(key, "") } else { Pair.apply(key, x) } }.reduceByKey((a, b) => { a + "\n" + b }).map(x => x._1 + x._2) /** * 将字符串转换为Logger */ val loggerRDD: RDD[Logger] = formatRDD.map { x => { val reg.r(time, thread, level, logger, msg) = x //通过正则取值 val log = new Logger(formatDate(time), thread, level, logger, msg) log } }.cache() /** * TODO 通过类的反射机制来定义数据库Scheme,但在scala语言中不知道为啥就是不成功,此处浪费了许久留着以后研究吧 */ /*val sqlc = new SQLContext(sc) sqlc.createDataFrame(loggerRDD, classOf[Logger]).registerTempTable("logger")*/ /** * 定义数据库Scheme */ val schemaString = "time thread level logger msg" val schema = StructType( schemaString.split(" ").map(fieldName => if ("time".equals(fieldName)) StructField(fieldName, TimestampType, true) else StructField(fieldName, StringType, true))) /** * 将Logger转换为Row */ val rowRDD = loggerRDD.map { log => Row( formatDate(log.time), log.thread, log.level, log.logger, log.msg) } /** * 利用SQL进行查询过滤 */ // val sqlc = bySQLContext(sc, rowRDD, schema); val sqlc = byHiveContext(sc, rowRDD, schema); val df = sqlc.sql("select * from logger where level='ERROR' and time between '2016-03-21 11:00:00' and '2016-03-21 12:00:00' order by time") val errLogRDD = df.map { x => new Logger( formatDate(x.getTimestamp(0)), x.getString(1), x.getString(2), x.getString(3), x.getString(4)) } for (log <- errLogRDD.take(10)) { println("time:" + formatDateToStr(log.time)) println("thread:" + log.thread) println("level:" + log.level) println("logger:" + log.logger) println("msg:" + log.msg) } println("<!--解析结束-->") } /** * 创建临时表 */ def bySQLContext(sc: SparkContext, rowRDD: RDD[Row], schema: StructType): SQLContext = { val sqlc = new SQLContext(sc) sqlc.createDataFrame(rowRDD, schema).registerTempTable("logger") sqlc } /** * 创建永久表,需要提前搭建好Spark与Hive的集成环境 */ def byHiveContext(sc: SparkContext, rowRDD: RDD[Row], schema: StructType): SQLContext = { val sqlc = new HiveContext(sc) sqlc.sql("drop table if exists logger") sqlc.sql("CREATE TABLE IF NOT EXISTS logger (time TIMESTAMP, thread STRING, level STRING, logger STRING, msg STRING)") sqlc.createDataFrame(rowRDD, schema).write.mode("overwrite").saveAsTable("logger") sqlc } def formatDate(str: String): Date = { new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(str) } def formatDate(timestamp: java.sql.Timestamp): Date = { new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(timestamp.toString()) } def formatDate(date: Date): java.sql.Timestamp = { new java.sql.Timestamp(date.getTime) } def formatDateToStr(date: Date): String = { new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(date) } }
Logger.scala:
import java.util.Date class Logger extends Serializable { var time: Date = null var thread: String = "" var level: String = "" var logger: String = "" var msg: String = "" def this(time: Date, thread: String, level: String, logger: String, msg: String) { this() this.time = time; this.thread = thread; this.level = level; this.logger = logger; this.msg = msg; } }
统计并写入Mysql
LoggerMysqlApp.scala:
import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.SQLContext import java.util.Date import java.text.SimpleDateFormat import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.Row import java.util.Properties object LoggerMysqlApp { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("输出写入Mysql")) /** * 从hive中加载数据 */ val hivec = new HiveContext(sc) val df = hivec.sql("select * from logger") val loggerRDD = df.rdd.map { x => new Logger( LoggerApp.formatDate(x.getTimestamp(0)), x.getString(1), x.getString(2), x.getString(3), x.getString(4)) } val resultRDD = loggerRDD.map { logger => Pair(formatDateToStr(logger.time), 1) }.reduceByKey((a, b) => { a + b }).map(f => Row(f._1, f._2)).sortBy(f => f.getInt(1), false, 2) for (r <- resultRDD.take(10)) { println(r.getString(0) + ":" + r.getInt(1)) } /** * 定义数据库Scheme */ val schemaString = "time count" val schema = StructType( schemaString.split(" ").map(fieldName => if ("time".equals(fieldName)) StructField(fieldName, StringType, true) else StructField(fieldName, IntegerType, true))) /** * TODO计算每分钟日志的个数 */ val connectionProperties = new Properties() connectionProperties.setProperty("user", "root") connectionProperties.setProperty("password", ".") new SQLContext(sc).createDataFrame(resultRDD, schema).write.jdbc( "jdbc:mysql://192.168.136.128:3306/logger", "logger", connectionProperties); } def formatDateToStr(date: Date): String = { new SimpleDateFormat("yyyy-MM-dd HH:mm").format(date) } }