编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本]
1. 开发环境
Jdk 1.7.0_72
Maven 3.2.1
Scala 2.10.6
Spark 1.6.2
Hadoop 2.6.4
IntelliJ IDEA 2016.1.1
2. 创建项目
1) 新建Maven项目
2) 在pom文件中导入依赖
pom.xml文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.xuebusi</groupId>
<artifactId>spark</artifactId>
<version>1.0-SNAPSHOT</version> <properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding> <!-- 这里对jar包版本做集中管理 -->
<scala.version>2.10.6</scala.version>
<spark.version>1.6.2</spark.version>
<hadoop.version>2.6.4</hadoop.version>
</properties> <dependencies>
<dependency>
<!-- scala语言核心包 -->
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<!-- spark核心包 -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency> <dependency>
<!-- hadoop的客户端,用于访问HDFS -->
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies> <build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!-- 由于我们的程序可能有很多,所以这里可以不用指定main方法所在的类名,我们可以在提交spark程序的时候手动指定要调用那个main方法 -->
<!--
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.xuebusi.spark.WordCount</mainClass>
</transformer>
</transformers>
--> </configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
虽然我们的pom文件中的jar包依赖准备好了,但是在Project的External Libraries缺少Maven依赖:
需要点击右侧的Maven Project侧边栏中的刷新按钮,才会导入Maven依赖,前提是保证电脑能够联网,Maven可能会到*仓库下载一些依赖:
在左侧的Project侧边栏中的External Libraries下就可以看到新导入的Maven依赖了:
但是在pom.xml文件中还有错误提示,因为src/main/和src/test/这两个目录下面没有scala目录:
分别在main和test目录之上点击鼠标右键选择new->Directory创建scala目录:
由于新创建的scala文件夹前面的图标颜色和java文件夹不一样,我们需要再次点击右侧Maven Project侧边栏中的刷新按钮,其颜色就会发生变化:
在scala目录下面创建WordCount(类型为Object):
3. 编写WordCount程序
下面是使用scala语言编写的Spark的一个简单的单词计数程序:
package com.xuebusi.spark import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} /**
* Created by SYJ on 2017/1/23.
*/
object WordCount {
def main(args: Array[String]) {
//创建SparkConf
val conf: SparkConf = new SparkConf()
//创建SparkContext
val sc: SparkContext = new SparkContext(conf)
//从文件读取数据
val lines: RDD[String] = sc.textFile(args(0))
//按空格切分单词
val words: RDD[String] = lines.flatMap(_.split(" "))
//单词计数,每个单词每出现一次就计数为1
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//聚合,统计每个单词总共出现的次数
val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//排序,根据单词出现的次数排序
val fianlResult: RDD[(String, Int)] = result.sortBy(_._2, false)
//将统计结果保存到文件
fianlResult.saveAsTextFile(args(1))
//释放资源
sc.stop()
}
}
4. 打包
将编写好的WordCount程序使用Maven插件打成jar包,打包的时候也要保证电脑能够联网,因为Maven可能会到*仓库中下载一些依赖:
在jar包名称上面点击鼠标右键选择“Copy Path”,得到jar包在Windows磁盘上的绝对路径:D:\bigdatacode\xbs-spark\target\spark-1.0-SNAPSHOT.jar,在下面上传jar包时会用到此路径。
5. 上传jar包
使用SecureCRT工具连接Spark集群服务器,将spark-1.0-SNAPSHOT.jar上传到服务器:
6. 同步时间
date -s "2017-01-23 19:19:30"
7. 启动Zookeeper
/root/apps/zookeeper/bin/zkServer.sh start
8. 启动hdfs
/root/apps/hadoop/sbin/start-dfs.sh
HDFS的活跃的NameNode节点:
HDFS的备选NameNode节点:
9. 启动Spark集群
/root/apps/spark/sbin/start-all.sh
启动单个Master进程使用如下命令:
/root/apps/spark/sbin/start-master.sh
Spark活跃的Master节点:
Spark的备选Master节点:
10. 准备输入数据
11. 提交Spark程序
提交Spark的WordCount程序需要两个参数,一个输入目录,一个输出目录,首先确定输出目录不存在,如果存在则删除:
hdfs dfs -rm -r /wordcount/output
使用spark-submit脚本提交spark程序:
/root/apps/spark/bin/spark-submit \
--master spark://hadoop01:7077,hadoop02:7077 \
--executor-memory 512m \
--total-executor-cores 7 \
--class com.xuebusi.spark.WordCount /root/spark-1.0-SNAPSHOT.jar hdfs://hadoop01:9000/wordcount/input hdfs://hadoop01:9000/wordcount/output
通过Spark的UI界面来观察程序执行过程:
12. 查看结果
附1:程序打包日志
D:\java\jdk1..0_72\bin\java -Dmaven.home=D:\apache-maven-3.2. -Dclassworlds.conf=D:\apache-maven-3.2.\bin\m2.conf -Didea.launcher.port= "-Didea.launcher.bin.path=D:\java\IntelliJ_IDEA\IntelliJ IDEA Community Edition 2016.1.1\bin" -Dfile.encoding=UTF- -classpath "D:\apache-maven-3.2.1\boot\plexus-classworlds-2.5.1.jar;D:\java\IntelliJ_IDEA\IntelliJ IDEA Community Edition 2016.1.1\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain org.codehaus.classworlds.Launcher -Didea.version=2016.1. package
[INFO] Scanning for projects...
[INFO]
[INFO] Using the builder org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder with a thread count of
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building spark 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ spark ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying resource
[INFO]
[INFO] --- maven-compiler-plugin:2.5.:compile (default-compile) @ spark ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- scala-maven-plugin:3.2.:compile (default) @ spark ---
[WARNING] Expected all dependencies to require Scala version: 2.10.
[WARNING] com.xuebusi:spark:1.0-SNAPSHOT requires scala version: 2.10.
[WARNING] com.twitter:chill_2.:0.5. requires scala version: 2.10.
[WARNING] Multiple versions of scala libraries detected!
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ spark ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory D:\bigdatacode\spark-wordcount\src\test\resources
[INFO]
[INFO] --- maven-compiler-plugin:2.5.:testCompile (default-testCompile) @ spark ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- scala-maven-plugin:3.2.:testCompile (default) @ spark ---
[WARNING] Expected all dependencies to require Scala version: 2.10.
[WARNING] com.xuebusi:spark:1.0-SNAPSHOT requires scala version: 2.10.
[WARNING] com.twitter:chill_2.:0.5. requires scala version: 2.10.
[WARNING] Multiple versions of scala libraries detected!
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.:test (default-test) @ spark ---
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.pom
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.pom (3 KB at 1.7 KB/sec)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.pom
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.pom (3 KB at 2.4 KB/sec)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/maven-surefire-common/2.12.4/maven-surefire-common-2.12.4.pom
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/maven-surefire-common/2.12.4/maven-surefire-common-2.12.4.pom (6 KB at 3.2 KB/sec)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/plugin-tools/maven-plugin-annotations/3.1/maven-plugin-annotations-3.1.pom
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/plugin-tools/maven-plugin-annotations/3.1/maven-plugin-annotations-3.1.pom (2 KB at 1.7 KB/sec)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/plugin-tools/maven-plugin-tools/3.1/maven-plugin-tools-3.1.pom
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/plugin-tools/maven-plugin-tools/3.1/maven-plugin-tools-3.1.pom (16 KB at 12.0 KB/sec)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/maven-surefire-common/2.12.4/maven-surefire-common-2.12.4.jar
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.jar
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.jar
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/plugin-tools/maven-plugin-annotations/3.1/maven-plugin-annotations-3.1.jar
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/plugin-tools/maven-plugin-annotations/3.1/maven-plugin-annotations-3.1.jar (14 KB at 10.6 KB/sec)
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.jar (34 KB at 21.5 KB/sec)
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/maven-surefire-common/2.12.4/maven-surefire-common-2.12.4.jar (257 KB at 161.0 KB/sec)
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.jar (115 KB at 55.1 KB/sec)
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ spark ---
[INFO] Building jar: D:\bigdatacode\spark-wordcount\target\spark-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-shade-plugin:2.4.:shade (default) @ spark ---
[INFO] Including org.scala-lang:scala-library:jar:2.10. in the shaded jar.
[INFO] Including org.apache.spark:spark-core_2.:jar:1.6. in the shaded jar.
[INFO] Including org.apache.avro:avro-mapred:jar:hadoop2:1.7. in the shaded jar.
[INFO] Including org.apache.avro:avro-ipc:jar:1.7. in the shaded jar.
[INFO] Including org.apache.avro:avro-ipc:jar:tests:1.7. in the shaded jar.
[INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9. in the shaded jar.
[INFO] Including org.codehaus.jackson:jackson-mapper-asl:jar:1.9. in the shaded jar.
[INFO] Including com.twitter:chill_2.:jar:0.5. in the shaded jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.21 in the shaded jar.
[INFO] Including com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including org.objenesis:objenesis:jar:1.2 in the shaded jar.
[INFO] Including com.twitter:chill-java:jar:0.5. in the shaded jar.
[INFO] Including org.apache.xbean:xbean-asm5-shaded:jar:4.4 in the shaded jar.
[INFO] Including org.apache.spark:spark-launcher_2.:jar:1.6. in the shaded jar.
[INFO] Including org.apache.spark:spark-network-common_2.:jar:1.6. in the shaded jar.
[INFO] Including org.apache.spark:spark-network-shuffle_2.:jar:1.6. in the shaded jar.
[INFO] Including org.fusesource.leveldbjni:leveldbjni-all:jar:1.8 in the shaded jar.
[INFO] Including com.fasterxml.jackson.core:jackson-annotations:jar:2.4. in the shaded jar.
[INFO] Including org.apache.spark:spark-unsafe_2.:jar:1.6. in the shaded jar.
[INFO] Including net.java.dev.jets3t:jets3t:jar:0.7. in the shaded jar.
[INFO] Including commons-codec:commons-codec:jar:1.3 in the shaded jar.
[INFO] Including commons-httpclient:commons-httpclient:jar:3.1 in the shaded jar.
[INFO] Including org.apache.curator:curator-recipes:jar:2.4. in the shaded jar.
[INFO] Including org.apache.curator:curator-framework:jar:2.4. in the shaded jar.
[INFO] Including org.apache.zookeeper:zookeeper:jar:3.4. in the shaded jar.
[INFO] Including jline:jline:jar:0.9. in the shaded jar.
[INFO] Including com.google.guava:guava:jar:14.0. in the shaded jar.
[INFO] Including org.eclipse.jetty.orbit:javax.servlet:jar:3.0..v201112011016 in the shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3. in the shaded jar.
[INFO] Including org.apache.commons:commons-math3:jar:3.4. in the shaded jar.
[INFO] Including com.google.code.findbugs:jsr305:jar:1.3. in the shaded jar.
[INFO] Including org.slf4j:slf4j-api:jar:1.7. in the shaded jar.
[INFO] Including org.slf4j:jul-to-slf4j:jar:1.7. in the shaded jar.
[INFO] Including org.slf4j:jcl-over-slf4j:jar:1.7. in the shaded jar.
[INFO] Including log4j:log4j:jar:1.2. in the shaded jar.
[INFO] Including org.slf4j:slf4j-log4j12:jar:1.7. in the shaded jar.
[INFO] Including com.ning:compress-lzf:jar:1.0. in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.2.1 in the shaded jar.
[INFO] Including net.jpountz.lz4:lz4:jar:1.3. in the shaded jar.
[INFO] Including org.roaringbitmap:RoaringBitmap:jar:0.5. in the shaded jar.
[INFO] Including commons-net:commons-net:jar:2.2 in the shaded jar.
[INFO] Including com.typesafe.akka:akka-remote_2.:jar:2.3. in the shaded jar.
[INFO] Including com.typesafe.akka:akka-actor_2.:jar:2.3. in the shaded jar.
[INFO] Including com.typesafe:config:jar:1.2. in the shaded jar.
[INFO] Including io.netty:netty:jar:3.8..Final in the shaded jar.
[INFO] Including com.google.protobuf:protobuf-java:jar:2.5. in the shaded jar.
[INFO] Including org.uncommons.maths:uncommons-maths:jar:1.2.2a in the shaded jar.
[INFO] Including com.typesafe.akka:akka-slf4j_2.:jar:2.3. in the shaded jar.
[INFO] Including org.json4s:json4s-jackson_2.:jar:3.2. in the shaded jar.
[INFO] Including org.json4s:json4s-core_2.:jar:3.2. in the shaded jar.
[INFO] Including org.json4s:json4s-ast_2.:jar:3.2. in the shaded jar.
[INFO] Including org.scala-lang:scalap:jar:2.10. in the shaded jar.
[INFO] Including org.scala-lang:scala-compiler:jar:2.10. in the shaded jar.
[INFO] Including com.sun.jersey:jersey-server:jar:1.9 in the shaded jar.
[INFO] Including asm:asm:jar:3.1 in the shaded jar.
[INFO] Including com.sun.jersey:jersey-core:jar:1.9 in the shaded jar.
[INFO] Including org.apache.mesos:mesos:jar:shaded-protobuf:0.21. in the shaded jar.
[INFO] Including io.netty:netty-all:jar:4.0..Final in the shaded jar.
[INFO] Including com.clearspring.analytics:stream:jar:2.7. in the shaded jar.
[INFO] Including io.dropwizard.metrics:metrics-core:jar:3.1. in the shaded jar.
[INFO] Including io.dropwizard.metrics:metrics-jvm:jar:3.1. in the shaded jar.
[INFO] Including io.dropwizard.metrics:metrics-json:jar:3.1. in the shaded jar.
[INFO] Including io.dropwizard.metrics:metrics-graphite:jar:3.1. in the shaded jar.
[INFO] Including com.fasterxml.jackson.core:jackson-databind:jar:2.4. in the shaded jar.
[INFO] Including com.fasterxml.jackson.core:jackson-core:jar:2.4. in the shaded jar.
[INFO] Including com.fasterxml.jackson.module:jackson-module-scala_2.:jar:2.4. in the shaded jar.
[INFO] Including org.scala-lang:scala-reflect:jar:2.10. in the shaded jar.
[INFO] Including com.thoughtworks.paranamer:paranamer:jar:2.6 in the shaded jar.
[INFO] Including org.apache.ivy:ivy:jar:2.4. in the shaded jar.
[INFO] Including oro:oro:jar:2.0. in the shaded jar.
[INFO] Including org.tachyonproject:tachyon-client:jar:0.8. in the shaded jar.
[INFO] Including commons-lang:commons-lang:jar:2.4 in the shaded jar.
[INFO] Including commons-io:commons-io:jar:2.4 in the shaded jar.
[INFO] Including org.tachyonproject:tachyon-underfs-hdfs:jar:0.8. in the shaded jar.
[INFO] Including org.tachyonproject:tachyon-underfs-s3:jar:0.8. in the shaded jar.
[INFO] Including org.tachyonproject:tachyon-underfs-local:jar:0.8. in the shaded jar.
[INFO] Including net.razorvine:pyrolite:jar:4.9 in the shaded jar.
[INFO] Including net.sf.py4j:py4j:jar:0.9 in the shaded jar.
[INFO] Including org.spark-project.spark:unused:jar:1.0. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-client:jar:2.6. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-common:jar:2.6. in the shaded jar.
[INFO] Including commons-cli:commons-cli:jar:1.2 in the shaded jar.
[INFO] Including xmlenc:xmlenc:jar:0.52 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2. in the shaded jar.
[INFO] Including commons-logging:commons-logging:jar:1.1. in the shaded jar.
[INFO] Including commons-configuration:commons-configuration:jar:1.6 in the shaded jar.
[INFO] Including commons-digester:commons-digester:jar:1.8 in the shaded jar.
[INFO] Including commons-beanutils:commons-beanutils:jar:1.7. in the shaded jar.
[INFO] Including commons-beanutils:commons-beanutils-core:jar:1.8. in the shaded jar.
[INFO] Including org.apache.avro:avro:jar:1.7. in the shaded jar.
[INFO] Including com.google.code.gson:gson:jar:2.2. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-auth:jar:2.6. in the shaded jar.
[INFO] Including org.apache.httpcomponents:httpclient:jar:4.2. in the shaded jar.
[INFO] Including org.apache.httpcomponents:httpcore:jar:4.2. in the shaded jar.
[INFO] Including org.apache.directory.server:apacheds-kerberos-codec:jar:2.0.-M15 in the shaded jar.
[INFO] Including org.apache.directory.server:apacheds-i18n:jar:2.0.-M15 in the shaded jar.
[INFO] Including org.apache.directory.api:api-asn1-api:jar:1.0.-M20 in the shaded jar.
[INFO] Including org.apache.directory.api:api-util:jar:1.0.-M20 in the shaded jar.
[INFO] Including org.apache.curator:curator-client:jar:2.6. in the shaded jar.
[INFO] Including org.htrace:htrace-core:jar:3.0. in the shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.4. in the shaded jar.
[INFO] Including org.tukaani:xz:jar:1.0 in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-hdfs:jar:2.6. in the shaded jar.
[INFO] Including org.mortbay.jetty:jetty-util:jar:6.1. in the shaded jar.
[INFO] Including xerces:xercesImpl:jar:2.9. in the shaded jar.
[INFO] Including xml-apis:xml-apis:jar:1.3. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.6. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.6. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-yarn-client:jar:2.6. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-yarn-server-common:jar:2.6. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-mapreduce-client-shuffle:jar:2.6. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-yarn-api:jar:2.6. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.6. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-yarn-common:jar:2.6. in the shaded jar.
[INFO] Including javax.xml.bind:jaxb-api:jar:2.2. in the shaded jar.
[INFO] Including javax.xml.stream:stax-api:jar:1.0- in the shaded jar.
[INFO] Including javax.activation:activation:jar:1.1 in the shaded jar.
[INFO] Including javax.servlet:servlet-api:jar:2.5 in the shaded jar.
[INFO] Including com.sun.jersey:jersey-client:jar:1.9 in the shaded jar.
[INFO] Including org.codehaus.jackson:jackson-jaxrs:jar:1.9. in the shaded jar.
[INFO] Including org.codehaus.jackson:jackson-xc:jar:1.9. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.6. in the shaded jar.
[INFO] Including org.apache.hadoop:hadoop-annotations:jar:2.6. in the shaded jar.
[WARNING] commons-logging-1.1..jar, jcl-over-slf4j-1.7..jar define overlapping classes:
[WARNING] - org.apache.commons.logging.impl.NoOpLog
[WARNING] - org.apache.commons.logging.impl.SimpleLog
[WARNING] - org.apache.commons.logging.LogFactory
[WARNING] - org.apache.commons.logging.LogConfigurationException
[WARNING] - org.apache.commons.logging.impl.SimpleLog$
[WARNING] - org.apache.commons.logging.Log
[WARNING] commons-beanutils-core-1.8..jar, commons-beanutils-1.7..jar define overlapping classes:
[WARNING] - org.apache.commons.beanutils.WrapDynaBean
[WARNING] - org.apache.commons.beanutils.Converter
[WARNING] - org.apache.commons.beanutils.converters.IntegerConverter
[WARNING] - org.apache.commons.beanutils.locale.LocaleBeanUtilsBean
[WARNING] - org.apache.commons.beanutils.locale.converters.DecimalLocaleConverter
[WARNING] - org.apache.commons.beanutils.locale.converters.DoubleLocaleConverter
[WARNING] - org.apache.commons.beanutils.converters.ShortConverter
[WARNING] - org.apache.commons.beanutils.converters.StringArrayConverter
[WARNING] - org.apache.commons.beanutils.locale.LocaleConvertUtilsBean
[WARNING] - org.apache.commons.beanutils.LazyDynaClass
[WARNING] - more...
[WARNING] hadoop-yarn-common-2.6..jar, hadoop-yarn-api-2.6..jar define overlapping classes:
[WARNING] - org.apache.hadoop.yarn.factories.package-info
[WARNING] - org.apache.hadoop.yarn.util.package-info
[WARNING] - org.apache.hadoop.yarn.factory.providers.package-info
[WARNING] commons-beanutils-core-1.8..jar, commons-collections-3.2..jar, commons-beanutils-1.7..jar define overlapping classes:
[WARNING] - org.apache.commons.collections.FastHashMap$EntrySet
[WARNING] - org.apache.commons.collections.ArrayStack
[WARNING] - org.apache.commons.collections.FastHashMap$
[WARNING] - org.apache.commons.collections.FastHashMap$KeySet
[WARNING] - org.apache.commons.collections.FastHashMap$CollectionView
[WARNING] - org.apache.commons.collections.BufferUnderflowException
[WARNING] - org.apache.commons.collections.Buffer
[WARNING] - org.apache.commons.collections.FastHashMap$CollectionView$CollectionViewIterator
[WARNING] - org.apache.commons.collections.FastHashMap$Values
[WARNING] - org.apache.commons.collections.FastHashMap
[WARNING] kryo-2.21.jar, objenesis-1.2.jar define overlapping classes:
[WARNING] - org.objenesis.Objenesis
[WARNING] - org.objenesis.strategy.StdInstantiatorStrategy
[WARNING] - org.objenesis.instantiator.basic.ObjectStreamClassInstantiator
[WARNING] - org.objenesis.instantiator.sun.SunReflectionFactorySerializationInstantiator
[WARNING] - org.objenesis.instantiator.perc.PercSerializationInstantiator
[WARNING] - org.objenesis.instantiator.NullInstantiator
[WARNING] - org.objenesis.instantiator.jrockit.JRockitLegacyInstantiator
[WARNING] - org.objenesis.instantiator.gcj.GCJInstantiatorBase
[WARNING] - org.objenesis.ObjenesisException
[WARNING] - org.objenesis.instantiator.basic.ObjectInputStreamInstantiator$MockStream
[WARNING] - more...
[WARNING] kryo-2.21.jar, reflectasm-1.07-shaded.jar define overlapping classes:
[WARNING] - com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes
[WARNING] - com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Frame
[WARNING] - com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Label
[WARNING] - com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.FieldWriter
[WARNING] - com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor
[WARNING] - com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.FieldVisitor
[WARNING] - com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Item
[WARNING] - com.esotericsoftware.reflectasm.AccessClassLoader
[WARNING] - com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Edge
[WARNING] - com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor
[WARNING] - more...
[WARNING] minlog-1.2.jar, kryo-2.21.jar define overlapping classes:
[WARNING] - com.esotericsoftware.minlog.Log$Logger
[WARNING] - com.esotericsoftware.minlog.Log
[WARNING] servlet-api-2.5.jar, javax.servlet-3.0..v201112011016.jar define overlapping classes:
[WARNING] - javax.servlet.ServletRequestWrapper
[WARNING] - javax.servlet.FilterChain
[WARNING] - javax.servlet.SingleThreadModel
[WARNING] - javax.servlet.http.HttpServletResponse
[WARNING] - javax.servlet.http.HttpUtils
[WARNING] - javax.servlet.ServletContextAttributeEvent
[WARNING] - javax.servlet.ServletContextAttributeListener
[WARNING] - javax.servlet.http.HttpServlet
[WARNING] - javax.servlet.http.HttpSessionAttributeListener
[WARNING] - javax.servlet.http.HttpServletRequest
[WARNING] - more...
[WARNING] guava-14.0..jar, spark-network-common_2.-1.6..jar define overlapping classes:
[WARNING] - com.google.common.base.Optional$$
[WARNING] - com.google.common.base.Supplier
[WARNING] - com.google.common.base.Function
[WARNING] - com.google.common.base.Optional
[WARNING] - com.google.common.base.Optional$
[WARNING] - com.google.common.base.Absent
[WARNING] - com.google.common.base.Present
[WARNING] hadoop-yarn-common-2.6..jar, hadoop-yarn-client-2.6..jar define overlapping classes:
[WARNING] - org.apache.hadoop.yarn.client.api.impl.package-info
[WARNING] - org.apache.hadoop.yarn.client.api.package-info
[WARNING] unused-1.0..jar, spark-core_2.-1.6..jar, spark-network-shuffle_2.-1.6..jar, spark-launcher_2.-1.6..jar, spark-unsafe_2.-1.6..jar, spark-network-common_2.-1.6..jar define overlapping classes:
[WARNING] - org.apache.spark.unused.UnusedStubClass
[WARNING] maven-shade-plugin has detected that some class files are
[WARNING] present in two or more JARs. When this happens, only one
[WARNING] single version of the class is copied to the uber jar.
[WARNING] Usually this is not harmful and you can skip these warnings,
[WARNING] otherwise try to manually exclude artifacts based on
[WARNING] mvn dependency:tree -Ddetail=true and the above output.
[WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing D:\bigdatacode\spark-wordcount\target\spark-1.0-SNAPSHOT.jar with D:\bigdatacode\spark-wordcount\target\spark-1.0-SNAPSHOT-shaded.jar
[INFO] Dependency-reduced POM written at: D:\bigdatacode\spark-wordcount\dependency-reduced-pom.xml
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: : min
[INFO] Finished at: --23T12::+:
[INFO] Final Memory: 18M/115M
[INFO] ------------------------------------------------------------------------ Process finished with exit code
附2:程序执行过程日志
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
// :: INFO SparkContext: Running Spark version 1.6.
// :: WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
// :: INFO SecurityManager: Changing view acls to: root
// :: INFO SecurityManager: Changing modify acls to: root
// :: INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
// :: INFO Utils: Successfully started service 'sparkDriver' on port .
// :: INFO Slf4jLogger: Slf4jLogger started
// :: INFO Remoting: Starting remoting
// :: INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.71.11:50102]
// :: INFO Utils: Successfully started service 'sparkDriverActorSystem' on port .
// :: INFO SparkEnv: Registering MapOutputTracker
// :: INFO SparkEnv: Registering BlockManagerMaster
// :: INFO DiskBlockManager: Created local directory at /tmp/blockmgr-677f9442-f73a--b629-297dc3409fa8
// :: INFO MemoryStore: MemoryStore started with capacity 517.4 MB
// :: INFO SparkEnv: Registering OutputCommitCoordinator
// :: INFO Utils: Successfully started service 'SparkUI' on port .
// :: INFO SparkUI: Started SparkUI at http://192.168.71.11:4040
// :: INFO HttpFileServer: HTTP File server directory is /tmp/spark-1be891d5-88d1-4e02-970b-023ff1e8c618/httpd-b73f26ff-5d13-4abd-953b-164a3f2d18e7
// :: INFO HttpServer: Starting HTTP Server
// :: INFO Utils: Successfully started service 'HTTP file server' on port .
// :: INFO SparkContext: Added JAR file:/root/spark-1.0-SNAPSHOT.jar at http://192.168.71.11:38719/jars/spark-1.0-SNAPSHOT.jar with timestamp 1485228982685
// :: INFO AppClient$ClientEndpoint: Connecting to master spark://hadoop01:7077...
// :: INFO AppClient$ClientEndpoint: Connecting to master spark://hadoop02:7077...
// :: INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app--
// :: INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port .
// :: INFO NettyBlockTransferService: Server created on
// :: INFO BlockManagerMaster: Trying to register BlockManager
// :: INFO BlockManagerMasterEndpoint: Registering block manager 192.168.71.11: with 517.4 MB RAM, BlockManagerId(driver, 192.168.71.11, )
// :: INFO BlockManagerMaster: Registered BlockManager
// :: INFO AppClient$ClientEndpoint: Executor added: app--/ on worker--192.168.71.12- (192.168.71.12:) with cores
// :: INFO SparkDeploySchedulerBackend: Granted executor ID app--/ on hostPort 192.168.71.12: with cores, 512.0 MB RAM
// :: INFO AppClient$ClientEndpoint: Executor added: app--/ on worker--192.168.71.13- (192.168.71.13:) with cores
// :: INFO SparkDeploySchedulerBackend: Granted executor ID app--/ on hostPort 192.168.71.13: with cores, 512.0 MB RAM
// :: INFO AppClient$ClientEndpoint: Executor updated: app--/ is now RUNNING
// :: INFO AppClient$ClientEndpoint: Executor updated: app--/ is now RUNNING
// :: INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
// :: INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 153.6 KB, free 153.6 KB)
// :: INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 167.5 KB)
// :: INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.71.11: (size: 13.9 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from textFile at WordCount.scala:
// :: INFO FileInputFormat: Total input paths to process :
// :: INFO SparkContext: Starting job: sortBy at WordCount.scala:
// :: INFO DAGScheduler: Registering RDD (map at WordCount.scala:)
// :: INFO DAGScheduler: Got job (sortBy at WordCount.scala:) with output partitions
// :: INFO DAGScheduler: Final stage: ResultStage (sortBy at WordCount.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List(ShuffleMapStage )
// :: INFO DAGScheduler: Submitting ShuffleMapStage (MapPartitionsRDD[] at map at WordCount.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 171.6 KB)
// :: INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 173.9 KB)
// :: INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.71.11: (size: 2.3 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ShuffleMapStage (MapPartitionsRDD[] at map at WordCount.scala:)
// :: INFO TaskSchedulerImpl: Adding task set 0.0 with tasks
// :: INFO SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (hadoop02:) with ID
// :: INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID , hadoop02, partition ,NODE_LOCAL, bytes)
// :: INFO BlockManagerMasterEndpoint: Registering block manager hadoop02: with 146.2 MB RAM, BlockManagerId(, hadoop02, )
// :: INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on hadoop02: (size: 2.3 KB, free: 146.2 MB)
// :: INFO SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (hadoop03:) with ID
// :: INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID , hadoop03, partition ,NODE_LOCAL, bytes)
// :: INFO BlockManagerMasterEndpoint: Registering block manager hadoop03: with 146.2 MB RAM, BlockManagerId(, hadoop03, )
// :: INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop02: (size: 13.9 KB, free: 146.2 MB)
// :: INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID ) in ms on hadoop02 (/)
// :: INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on hadoop03: (size: 2.3 KB, free: 146.2 MB)
// :: INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop03: (size: 13.9 KB, free: 146.2 MB)
// :: INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID ) in ms on hadoop03 (/)
// :: INFO DAGScheduler: ShuffleMapStage (map at WordCount.scala:) finished in 68.357 s
// :: INFO DAGScheduler: looking for newly runnable stages
// :: INFO DAGScheduler: running: Set()
// :: INFO DAGScheduler: waiting: Set(ResultStage )
// :: INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: failed: Set()
// :: INFO DAGScheduler: Submitting ResultStage (MapPartitionsRDD[] at sortBy at WordCount.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.6 KB, free 177.5 KB)
// :: INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.0 KB, free 179.5 KB)
// :: INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.71.11: (size: 2.0 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ResultStage (MapPartitionsRDD[] at sortBy at WordCount.scala:)
// :: INFO TaskSchedulerImpl: Adding task set 1.0 with tasks
// :: INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID , hadoop03, partition ,NODE_LOCAL, bytes)
// :: INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID , hadoop02, partition ,NODE_LOCAL, bytes)
// :: INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on hadoop03: (size: 2.0 KB, free: 146.2 MB)
// :: INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle to hadoop03:
// :: INFO MapOutputTrackerMaster: Size of output statuses for shuffle is bytes
// :: INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on hadoop02: (size: 2.0 KB, free: 146.2 MB)
// :: INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID ) in ms on hadoop03 (/)
// :: INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle to hadoop02:
// :: INFO DAGScheduler: ResultStage (sortBy at WordCount.scala:) finished in 1.615 s
// :: INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID ) in ms on hadoop02 (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: Job finished: sortBy at WordCount.scala:, took 71.451062 s
// :: INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
// :: INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
// :: INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
// :: INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
// :: INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
// :: INFO SparkContext: Starting job: saveAsTextFile at WordCount.scala:
// :: INFO DAGScheduler: Registering RDD (sortBy at WordCount.scala:)
// :: INFO DAGScheduler: Got job (saveAsTextFile at WordCount.scala:) with output partitions
// :: INFO DAGScheduler: Final stage: ResultStage (saveAsTextFile at WordCount.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List(ShuffleMapStage )
// :: INFO DAGScheduler: Submitting ShuffleMapStage (MapPartitionsRDD[] at sortBy at WordCount.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.5 KB, free 183.1 KB)
// :: INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.0 KB, free 185.1 KB)
// :: INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.71.11: (size: 2.0 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ShuffleMapStage (MapPartitionsRDD[] at sortBy at WordCount.scala:)
// :: INFO TaskSchedulerImpl: Adding task set 3.0 with tasks
// :: INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID , hadoop02, partition ,NODE_LOCAL, bytes)
// :: INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on hadoop02: (size: 2.0 KB, free: 146.2 MB)
// :: INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID , hadoop02, partition ,NODE_LOCAL, bytes)
// :: INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID ) in ms on hadoop02 (/)
// :: INFO DAGScheduler: ShuffleMapStage (sortBy at WordCount.scala:) finished in 0.643 s
// :: INFO DAGScheduler: looking for newly runnable stages
// :: INFO DAGScheduler: running: Set()
// :: INFO DAGScheduler: waiting: Set(ResultStage )
// :: INFO DAGScheduler: failed: Set()
// :: INFO DAGScheduler: Submitting ResultStage (MapPartitionsRDD[] at saveAsTextFile at WordCount.scala:), which has no missing parents
// :: INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID ) in ms on hadoop02 (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
// :: INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 64.9 KB, free 250.0 KB)
// :: INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 22.5 KB, free 272.5 KB)
// :: INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.71.11: (size: 22.5 KB, free: 517.4 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ResultStage (MapPartitionsRDD[] at saveAsTextFile at WordCount.scala:)
// :: INFO TaskSchedulerImpl: Adding task set 4.0 with tasks
// :: INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID , hadoop02, partition ,NODE_LOCAL, bytes)
// :: INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop02: (size: 22.5 KB, free: 146.2 MB)
// :: INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle to hadoop02:
// :: INFO MapOutputTrackerMaster: Size of output statuses for shuffle is bytes
// :: INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID , hadoop03, partition ,ANY, bytes)
// :: INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop03: (size: 22.5 KB, free: 146.2 MB)
// :: INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID ) in ms on hadoop02 (/)
// :: INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle to hadoop03:
// :: INFO DAGScheduler: ResultStage (saveAsTextFile at WordCount.scala:) finished in 5.782 s
// :: INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID ) in ms on hadoop03 (/)
// :: INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
// :: INFO DAGScheduler: Job finished: saveAsTextFile at WordCount.scala:, took 7.582931 s
// :: INFO SparkUI: Stopped Spark web UI at http://192.168.71.11:4040
// :: INFO SparkDeploySchedulerBackend: Shutting down all executors
// :: INFO SparkDeploySchedulerBackend: Asking each executor to shut down
// :: INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
// :: INFO MemoryStore: MemoryStore cleared
// :: INFO BlockManager: BlockManager stopped
// :: INFO BlockManagerMaster: BlockManagerMaster stopped
// :: INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
// :: INFO SparkContext: Successfully stopped SparkContext
// :: INFO ShutdownHookManager: Shutdown hook called
// :: INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
// :: INFO ShutdownHookManager: Deleting directory /tmp/spark-1be891d5-88d1-4e02-970b-023ff1e8c618/httpd-b73f26ff-5d13-4abd-953b-164a3f2d18e7
// :: INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
// :: INFO ShutdownHookManager: Deleting directory /tmp/spark-1be891d5-88d1-4e02-970b-023ff1e8c618
// :: INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
13. 使用Java语言编写Spark的WordCount程序
上面我们使用Scala语言编写了一个Spark的WordCount程序,并成功提交了到Spark集群上进行了运行。现在我们在同一个工程中使用Java语言也编写一个Spark的WordCount单词计数程序。
1) 修改pom文件内容
原来的pom文件中只有一个编译scala程序的Maven插件,现在我们要编译java程序,就需要引入java的Maven编译插件。
完整的pom.xml文件内容如下(替换原来的pom文件内容,对原来scala版的WordCount程序不会有影响):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.xuebusi</groupId>
<artifactId>spark</artifactId>
<version>1.0-SNAPSHOT</version> <properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.6</scala.version>
<spark.version>1.6.2</spark.version>
<hadoop.version>2.6.4</hadoop.version>
</properties> <dependencies>
<!-- 如果我们仅使用java来编写spark程序,可以不导此包 -->
<!-- scala的语言核心包 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Hadoop的客户端 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- spark的核心包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies> <build>
<pluginManagement>
<plugins>
<!-- scala-maven-plugin:编译scala程序的Maven插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- maven-compiler-plugin:编译java程序的Maven插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<!-- 编译scala程序的Maven插件的一些配置参数 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 编译java程序的Maven插件的一些配置参数 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- maven-shade-plugin:打jar包用的Mavne插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2) 刷新依赖
pom文件准备好以后,需要点击右侧的Maven Project中的刷新按钮,才会真正导入Maven依赖。如果本地的Maven仓库中缺少相关的依赖,Maven会自动到*仓库中下载依赖的jar包,所以要求电脑必须能够联网。
3) 创建JavaWordCount类
在src/main/java目录下面创建JavaWordCount类:
完整的JavaWordCount类代码如下:
package com.xuebusi.spark; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2; import java.util.Arrays; /**
* 这里我们仅使用Java的API编写一个简单的Spark应用程序,
* 对数据做简单的处理,业务比较简单,
* 在实际项目中可能要结合较为复杂的业务逻辑,
* 比如操作数据库,操作HDFS/Kafka/Hbase等,
* 或者和其他的第三方的组件进行整合等等;
* 如果你对Scala语言不熟悉,你可以使用Java,
* 没有倾向说哪一种语言更好,
* 但是只要java能够完成的功能,scala也可以;
*
* Created by SYJ on 2017/1/23.
*/
public class JavaWordCount {
/**
* main方法的快捷键:psvm
* 自动补全变量名的快捷键:Ctrl+Alt+V
*
* 由于JDK1.7版本还不支持函数式编程,
* 所以你会看到给很多方法传递参数时,
* 大量使用到了匿名类;
*
* 其实使用Java来编写Spark程序并不难,
* 很多代码都不用我们自己写,
* 因为IDEA开发工具提供了很好的代码提示和代码自动
* 补全功能--根据提示使用Tab键可以快速补全代码;
* @param args
*/
public static void main(String[] args) {
//创建SparkConf,并指定应用程序名称
SparkConf conf = new SparkConf().setAppName("JavaWordCount"); //创建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf); //从文件系统读取数据
//注意在Java的数组取下标使用中括号args[0],而scala使用小括号args(0)
//其实JavaRDD继承了Spark的RDD,对其做了扩展
JavaRDD<String> lines = sc.textFile(args[0]); /**
* 切分单词
*
* FlatMapFunction是匿名类,
* 它的两个参数中,第一个参数是输入的数据类型,
* 第二个参数是输出的数据类型;
* 这里输入一行数据line,返回一个迭代器,
* 迭代器中装的一行文本被按照空格切分后的单词;
*/
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
}); /**
* 每个单词每出现一次就计数为1
* 在scala中调用的是map方法,而在java中调用的则是mapToPair方法,
* mapToPair方法表示将一个map变成一个元组;
*
* 匿名函数PairFunction的泛型有3个:
* (1)第一个参数表示输入,这里输入的是单词;
* (2)第二个和第三个参数是返回的元组中的两个元素的数据类型,这里返回的是单词和数字1;
*
* 在java中没有Tuple类型的数据结构,所以它就搞了一个Tuple2类来模拟Scala中的Tuple;
*/
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
}); /**
* 分组聚合
*
* 我们可以调用GroupByKey方法,但是它的效率比较低,
* 我们可以调用ReduceByKey方法,它会先在局部聚合,
* 然后再全局聚合,相当于有一个Combine的功能;
*
* reducebyKey需要一个Function2类型的匿名类,
* 这个Function2有3个泛型,前两个类型表示要对输入的两个数字进行叠加,
* 最后一个类型表示返回两个数字叠加后的和;
* reducebyKey只对value进行聚合,而key不用管;
*/
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
}); /**
* 反转,反转是为了后面的排序
*/
JavaPairRDD<Integer, String> swapedPair = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
//将元组中数据交换位置的第一种方式(下面还有第二种方式)
return new Tuple2<Integer, String>(tp._2, tp._1);
}
}); /**
* 排序
*
* java只提供了sortByKey,它只能按照key进行排序,
* 而我们要按照value来排序,所以需要先将元组中的两个
* 元素进行反转,在根据key进行排序,最后再反转回来;
*/
JavaPairRDD<Integer, String> sortedPair = swapedPair.sortByKey(false); JavaPairRDD<String, Integer> finalResult = sortedPair.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
//将元组中的数据交换位置的第二种方式(swap就是交换的意思)
return tp.swap();
}
}); //将结果存储到文件系统
finalResult.saveAsTextFile(args[1]); //释放资源
sc.stop(); }
}
4) 运行程序
将编写好的JavaWordCount程序打成jar包并上传到Spark集群服务器。
在运行程序之前,检查一下hdfs上是否已经存在“/wordcount/output”目录,若存在则删除。
在集群环境都正常运行的前提下,使用如下命令来运行我们的JavaWordCount程序,注意要使用“—class”来指定要运行的类为“com.xuebusi.spark.JavaWordCount”:
/root/apps/spark/bin/spark-submit \
--master spark://hadoop01:7077,hadoop02:7077 \
--executor-memory 512m \
--total-executor-cores 7 \
--class com.xuebusi.spark.JavaWordCount \
/root/spark-1.0-SNAPSHOT.jar \
hdfs://hadoop01:9000/wordcount/input \
hdfs://hadoop01:9000/wordcount/output