最新版Spark 3 HelloWorld

Spark已经发布到3.1.1了,好久没看这个项目了.今天更新下本地仓库,编译下竟然出错了.

$mvn compile
......
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:403: type mismatch;
 found   : Map[String,org.apache.spark.resource.ResourceInformation]
 required: scala.collection.immutable.Map[String,org.apache.spark.ResourceInformation]
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:404: type mismatch;
 found   : scala.collection.immutable.Map[String,org.apache.spark.ResourceInformation]
 required: Map[String,org.apache.spark.resource.ResourceInformation]
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:554: overloaded method value apply with alternatives:
  (env: org.apache.spark.SparkEnv,resources: java.util.Map[String,org.apache.spark.resource.ResourceInformation])Option[org.apache.spark.internal.plugin.PluginContainer] <and>
  (sc: org.apache.spark.SparkContext,resources: java.util.Map[String,org.apache.spark.resource.ResourceInformation])Option[org.apache.spark.internal.plugin.PluginContainer]
 cannot be applied to (org.apache.spark.SparkContext, java.util.Map[String,org.apache.spark.ResourceInformation])
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala:118: type mismatch;
 found   : java.util.Map[String,org.apache.spark.ResourceInformation]
 required: java.util.Map[String,org.apache.spark.resource.ResourceInformation]
[INFO] [Info] : java.util.Map[String,org.apache.spark.ResourceInformation] <: java.util.Map[String,org.apache.spark.resource.ResourceInformation]?
[INFO] [Info] : false
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:308: too many arguments (6) for method resolveMavenDependencies: (packagesExclusions: String, packages: String, repositories: String, ivyRepoPath: String, ivySettingsPath: Option[String])String
Note that 'packagesTransitive' is not a parameter name of the invoked method.
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:377: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.
Unspecified value parameter secMgr.
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:380: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.
Unspecified value parameter secMgr.
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:383: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.
Unspecified value parameter secMgr.
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:392: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.
Unspecified value parameter secMgr.
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:396: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.
Unspecified value parameter secMgr.
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:450: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String.
Unspecified value parameter secMgr.
[ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/scheduler/Task.scala:101: type mismatch;
 found   : Map[String,org.apache.spark.resource.ResourceInformation]
 required: Map[String,org.apache.spark.ResourceInformation]
[INFO] [Info] : Map[String,org.apache.spark.resource.ResourceInformation] <: Map[String,org.apache.spark.ResourceInformation]?
[INFO] [Info] : false
[ERROR] 12 errors found
......

赶紧看了项目README,原来本地环境已经不能满足新Spark需要了,Spark项目也打包了相应的编译工具,按照文档编译就好了.

如果先前没下载过这个项目可以使用git clone先把项目下载到本地:

git clone https://github.com/apache/spark.git


进入项目目录并编译:

$cd spark
$./build/mvn -DskipTests clean package
......
[INFO] Reactor Summary for Spark Project Parent POM 3.2.0-SNAPSHOT:
[INFO] 
[INFO] Spark Project Parent POM ........................... SUCCESS [  2.562 s]
[INFO] Spark Project Tags ................................. SUCCESS [  5.148 s]
[INFO] Spark Project Sketch ............................... SUCCESS [  5.963 s]
[INFO] Spark Project Local DB ............................. SUCCESS [  1.505 s]
[INFO] Spark Project Networking ........................... SUCCESS [  2.883 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  1.516 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [  7.137 s]
[INFO] Spark Project Launcher ............................. SUCCESS [  1.516 s]
[INFO] Spark Project Core ................................. SUCCESS [01:55 min]
[INFO] Spark Project ML Local Library ..................... SUCCESS [ 36.128 s]
[INFO] Spark Project GraphX ............................... SUCCESS [ 30.925 s]
[INFO] Spark Project Streaming ............................ SUCCESS [ 53.579 s]
[INFO] Spark Project Catalyst ............................. SUCCESS [03:50 min]
[INFO] Spark Project SQL .................................. SUCCESS [07:58 min]
[INFO] Spark Project ML Library ........................... SUCCESS [02:42 min]
[INFO] Spark Project Tools ................................ SUCCESS [ 13.733 s]
[INFO] Spark Project Hive ................................. SUCCESS [04:52 min]
[INFO] Spark Project REPL ................................. SUCCESS [ 34.085 s]
[INFO] Spark Project Assembly ............................. SUCCESS [  8.368 s]
[INFO] Kafka 0.10+ Token Provider for Streaming ........... SUCCESS [01:06 min]
[INFO] Spark Integration for Kafka 0.10 ................... SUCCESS [02:08 min]
[INFO] Kafka 0.10+ Source for Structured Streaming ........ SUCCESS [01:24 min]
[INFO] Spark Project Examples ............................. SUCCESS [01:01 min]
[INFO] Spark Integration for Kafka 0.10 Assembly .......... SUCCESS [ 10.397 s]
[INFO] Spark Avro ......................................... SUCCESS [01:12 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  31:51 min
[INFO] Finished at: 2021-03-13T17:28:05+08:00
[INFO] ------------------------------------------------------------------------

从编译结果看Spark3项目结构还是变化挺大的,像Tags/Sketch/LocalDB/这些原来都没见过.具体每个子项目是做什么的,我们以后再一一介绍.接下来先运行下HelloWorld.试用Spark最简单的方式是使用scala shell:

tianlang@tianlang:spark$ ./bin/spark-shell 
2021-03-14 08:51:53,351 WARN util.Utils: Your hostname, tianlang resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface wlp7s0)
2021-03-14 08:51:53,352 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-03-14 08:52:01,141 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.0.104:4040
Spark context available as 'sc' (master = local[*], app id = local-1615683123916).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.range(1000*1000*1000).count();
res0: Long = 1000000000

spark.range(1000*1000*1000)是生成一个从0(包含)到10亿(不包含)的序列,可看成是一个只有一列的表t_range:

1
2
......
999999998
999999999


count()是计算下数据条数,有一条算一条总共是10亿条.相当于SQL的select count(*) from  t_range. SQL的count函数容易跟SQL的sum函数弄混,sum是把所有的数据加起来也就是取序列中所有数值的和:0+1+2+3+......+999999998+9999999999;而count是统计下序列中的数据条数有一条算一条,不管具体数值是1还是999999998:1+1+1+......+1+1;

在SparkShell中不能直接调用sum对序列中的数值求和:

scala> spark.range(1000*1000*1000).sum();
<console>:24: error: value sum is not a member of org.apache.spark.sql.Dataset[Long]
       spark.range(1000*1000*1000).sum();

因为spark.range生成的Dataset对象没有sum函数.那怎么实现求和操作呢?

可以查下Spark API文档,看下Dataset有那些函数可以使用:

最新版Spark 3 HelloWorld

首先看到的是跟sum长的比较像的summary.看下它的介绍如果英文看不懂也没关系,可以看示例代码.发现它可以用来统计总条数/平均数/最大值/最小值等就是没有取和.看来不是名字像功能就一样啊.

如果先前接触过大数据,应该听过MapReduce.那应该就比较容易找到reduce函数:

defreduce(func: (T, T) ⇒ T): T

(Scala-specific) Reduces the elements of this Dataset using the specified binary function. The given func must be commutative and associative or the result may be non-deterministic.

用reduce取和:

scala> spark.range(1000*1000*1000).reduce((a,b) => a+b)
<console>:24: error: overloaded method value reduce with alternatives:
  (func: org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long <and>
  (func: (java.lang.Long, java.lang.Long) => java.lang.Long)java.lang.Long
 cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long)
       spark.range(1000*1000*1000).reduce((a,b) => a+b)

为reduce提供一个取和的函数还报错了,从错误信息可以看出是数据类型不匹配问题,来个强制类型转换吧:

scala> spark.range(1000*1000*1000).reduce((a,b) => (a+b).asInstanceOf[java.lang.Long])
res11: Long = 499999999500000000  

当然也可以使用其它的方式实现取和,比如:foreach,但执行方式跟reduce是有差别的,我们后面有机会再说. 

大家应该也感觉到了,使用reduce函数远没有SQL中的sum函数方便.SQL中的函数用现在比较流行的词叫声明式的API,只需要关注我要什么就可以了,而不需要像reduce一样还要我关注怎么干.

这也是SQL经久不衰的一个原因吧.Spark也很早就提供了Spark SQL模块用于支持SQL语法.可以回头看下我们先前使用的Dataset就是sql包下的:

scala> spark.range(1000*1000*1000);
res2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

我们先前也说可以把range结果类比成一个只有一列的表,也不是随便说说的.还真的可以在上面执行SQL语句:

首先把Dataset注册为临时视图(也可以叫临时表,但注册临时表的API在2.0.0后就标记为废弃了):

scala> spark.range(1000*1000*1000).createOrReplaceTempView("t_range");

接下来就可以对视图t_range执行SQL了:


scala> spark.sql("select sum(id) from t_range");
res18: org.apache.spark.sql.DataFrame = [sum(id): bigint]

scala> res18.collect
res19: Array[org.apache.spark.sql.Row] = Array([499999999500000000])

我是怎么知道列名称是id的?是通过printSchema函数.

scala> spark.range(1000*1000*1000).printSchema();
root
 |-- id: long (nullable = false)

上面的代码都是使用的Scala,如果更倾向于使用Python.也可以使用./bin/pyspark.Spark3对Python的支持也提到了一个新高度.

HelloWorld就先到这里吧.蚂蚁啃骨头一点一点来.


上一篇:微信小程序—经纬度逆地址解析的实现


下一篇:es6 javascript的class类中的 get和set