1. 新建项目
新建 idea Maven项目工程, 并创建子工程,pom.xml文件中引入spark依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>dintalk-classes</artifactId> <groupId>cn.dintalk.bigdata</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>spark-core</artifactId> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies> </project>
2.准备数据文件
3.代码编写
3.1第一种写法
package cn.dintalk.bigdata.spark.core.wc import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark01_WordCount { def main(args: Array[String]): Unit = { // application -> spark 框架 // 1. 建立 和 Spark框架的连接 // JDBC 有 connection , Spark 有 SparkContext val sparkConf= new SparkConf() .setMaster("local") .setAppName("wordCount") val sc = new SparkContext(sparkConf) // 2. 执行业务操作 // 2.1 读取文件, 获取一行一行的数据 val lines: RDD[String] = sc.textFile("datas") // 2.2 将行数据进行切分,形成一个一个的单词 val words: RDD[String] = lines.flatMap(_.split(" ")) // 2.3 将数据按照单词进行分组,便于统计 val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word) // 2.4 对分组后的数据进行转换 // (hello,hello,hello), (word,word) -> (hello,3),(word,2) val wordCount: RDD[(String, Int)] = wordGroup.map { case (word, list) => { (word, list.size) } } // 2.5 将转换结果采集到控制台输出 val tuples: Array[(String, Int)] = wordCount.collect() tuples.foreach(println) // 3. 关闭连接 sc.stop() } }
package cn.dintalk.bigdata.spark.core.wc import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark02_WordCount { def main(args: Array[String]): Unit = { // application -> spark 框架 // 1. 建立 和 Spark框架的连接 // JDBC 有 connection , Spark 有 SparkContext val sparkConf= new SparkConf() .setMaster("local") .setAppName("wordCount") val sc = new SparkContext(sparkConf) // 2. 执行业务操作 // 2.1 读取文件, 获取一行一行的数据 val lines: RDD[String] = sc.textFile("datas") // 2.2 将行数据进行切分,形成一个一个的单词 val words: RDD[String] = lines.flatMap(_.split(" ")) val wordToOne: RDD[(String, Int)] = words.map(word => (word, 1)) val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne .groupBy(t => t._1) val wordCount: RDD[(String, Int)] = wordGroup.map { case (word, list) => { list.reduce( (t1, t2) => { (t1._1, t1._2 + t2._2) } ) } } // 2.5 将转换结果采集到控制台输出 val tuples: Array[(String, Int)] = wordCount.collect() tuples.foreach(println) // 3. 关闭连接 sc.stop() } }
package cn.dintalk.bigdata.spark.core.wc import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark03_WordCount { def main(args: Array[String]): Unit = { val sparkConf= new SparkConf() .setMaster("local") .setAppName("wordCount") val sc = new SparkContext(sparkConf) val lines: RDD[String] = sc.textFile("datas") val words: RDD[String] = lines.flatMap(_.split(" ")) val wordToOne: RDD[(String, Int)] = words.map((_, 1)) val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _) val tuples: Array[(String, Int)] = wordCount.collect() tuples.foreach(println) sc.stop() } }
4.log4j控制日志输出
4.1resources目录下新建log4j.properties并 配置
log4j.rootLogger=ERROR, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n log4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=../log/agent.log log4j.appender.R.MaxFileSize=10240KB log4j.appender.R.MaxBackupIndex=1 log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
4.2验证日志输出
无多余日志的输出