在进行实际的Spark应用程序开发时,常常会利用Window环境进行程序开发,开发测试好之后提交到Spark集群中利用bin/spark-submit脚本进行程序的发布运行,本教程教将一步一步地教大家如何进行该操作。本教程主要内容如下:
- Window开发环境说明
- Spark集群运行环境说明
- Scala IDE For Eclipse中Spark程序开发
- 利用spark-submit脚本提交到Spark集群当中运行
Windows开发环境说明
(1)Scala-IDE eclipse ,版本号见下图
(2) JAVA版本号 JDK 1.7
(3) Scala 版本号 2.10.4
Spark集群运行环境说明
(1)操作系统:Ubuntu 10.04
(2) Java与Scala版本号与Windows上一致
(3) Hadoop 版本 hadoop 2.2.0
(4) Spark 版本 Spark 1.1.0
配置如下:
IP地址 | 主机名 | 运行进程 |
---|---|---|
192.168.1.104 | cluster04 | QuorumPeerMain(ZooKeeper进程) Master(Spark Master进程) DataNode JournalNode ResourceManager(Yanr资源管理器) NodeManager Worker |
192.168.1.105 | cluster05 | NameNode QuorumPeerMain(ZooKeeper进程) Worker(Spark Worker进程) NodeManager DataNode DFSZKFailoverController(用于实现 NameNode HA) JournalNode |
192.168.1.106 | cluster06 | NameNode QuorumPeerMain(ZooKeeper进程) Worker(Spark Worker进程) NodeManager DataNode DFSZKFailoverController(用于实现 NameNode HA) JournalNode |
Scala IDE For Eclipse中Spark程序开发
(1) 在Scala IDE For Eclipse新建一个Scala Project,全名为:SparkWordCount
(2) 将spark-assembly-1.1.0-hadoop2.2.0.jar导入
(3)工程结构如下图
(4) 将UserPurchaseHistory.csv上传到HDFS根目录:hadoop fs -put /data/UserPurchaseHistory.csv /
UserPurchaseHistory.csv内容如下:
第一列表示客户姓名,第二列表示购买物品,第三列表示物品价格
(4)创建包cn.ml,并新建Scala object,全名为PurchaseProduct,代码如下:
package cn.ml
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object PurchaseProduct {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("PurchaseProduct")
val sc = new SparkContext(conf)
//从HDFS根目录中读取UserPurchaseHistory.csv文件
val data = sc.textFile("/UserPurchaseHistory.csv")
.map(line => line.split(","))
.map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2)))
//计算赎买数据
val numPurchases = data.count()
//计算客户数量
val uniqueUsers = data.map { case (user, product, price) => user }.distinct().count()
//商品价格合计
val totalRevenue = data.map { case (user, product, price) => price.toDouble }.sum()
//找出最受欢迎的商品
val productsByPopularity = data
.map { case (user, product, price) => (product, 1) }
.reduceByKey(_ + _)
.collect()
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)
// finally, print everything out
println("Total purchases: " + numPurchases)
println("Unique users: " + uniqueUsers)
println("Total revenue: " + totalRevenue)
println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2))
sc.stop()
}
}
(5)将工程打包成Jar文件:SparkWordCount.jar
利用spark-submit脚本将程序提交到Spark集群当中运行
(1)将打包好的SparkWordCount.jar文件上传到Spark Master所在的机器cluster04的根目录上,然后运行下列脚本:
/spark-1.1.0/bin# ./spark-submit –master spark://itcast04:7077 –class cn.ml.PurchaseProduct /SparkWordCount.jar
–master 用于指定集群的master
–class 用于指定待运行的主类
(2) 运行结果图