Spark已经发布到3.1.1了,好久没看这个项目了.今天更新下本地仓库,编译下竟然出错了.
$mvn compile ...... [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:403: type mismatch; found : Map[String,org.apache.spark.resource.ResourceInformation] required: scala.collection.immutable.Map[String,org.apache.spark.ResourceInformation] [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:404: type mismatch; found : scala.collection.immutable.Map[String,org.apache.spark.ResourceInformation] required: Map[String,org.apache.spark.resource.ResourceInformation] [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/SparkContext.scala:554: overloaded method value apply with alternatives: (env: org.apache.spark.SparkEnv,resources: java.util.Map[String,org.apache.spark.resource.ResourceInformation])Option[org.apache.spark.internal.plugin.PluginContainer] <and> (sc: org.apache.spark.SparkContext,resources: java.util.Map[String,org.apache.spark.resource.ResourceInformation])Option[org.apache.spark.internal.plugin.PluginContainer] cannot be applied to (org.apache.spark.SparkContext, java.util.Map[String,org.apache.spark.ResourceInformation]) [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala:118: type mismatch; found : java.util.Map[String,org.apache.spark.ResourceInformation] required: java.util.Map[String,org.apache.spark.resource.ResourceInformation] [INFO] [Info] : java.util.Map[String,org.apache.spark.ResourceInformation] <: java.util.Map[String,org.apache.spark.resource.ResourceInformation]? [INFO] [Info] : false [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:308: too many arguments (6) for method resolveMavenDependencies: (packagesExclusions: String, packages: String, repositories: String, ivyRepoPath: String, ivySettingsPath: Option[String])String Note that 'packagesTransitive' is not a parameter name of the invoked method. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:377: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:380: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:383: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:392: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:396: not enough arguments for method downloadFileList: (fileList: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:450: not enough arguments for method downloadFile: (path: String, targetDir: java.io.File, sparkConf: org.apache.spark.SparkConf, hadoopConf: org.apache.hadoop.conf.Configuration, secMgr: org.apache.spark.SecurityManager)String. Unspecified value parameter secMgr. [ERROR] [Error] /data/code/github/bigdata/spark/core/src/main/scala/org/apache/spark/scheduler/Task.scala:101: type mismatch; found : Map[String,org.apache.spark.resource.ResourceInformation] required: Map[String,org.apache.spark.ResourceInformation] [INFO] [Info] : Map[String,org.apache.spark.resource.ResourceInformation] <: Map[String,org.apache.spark.ResourceInformation]? [INFO] [Info] : false [ERROR] 12 errors found ......
赶紧看了项目README,原来本地环境已经不能满足新Spark需要了,Spark项目也打包了相应的编译工具,按照文档编译就好了.
如果先前没下载过这个项目可以使用git clone先把项目下载到本地:
git clone https://github.com/apache/spark.git
进入项目目录并编译:
$cd spark $./build/mvn -DskipTests clean package ...... [INFO] Reactor Summary for Spark Project Parent POM 3.2.0-SNAPSHOT: [INFO] [INFO] Spark Project Parent POM ........................... SUCCESS [ 2.562 s] [INFO] Spark Project Tags ................................. SUCCESS [ 5.148 s] [INFO] Spark Project Sketch ............................... SUCCESS [ 5.963 s] [INFO] Spark Project Local DB ............................. SUCCESS [ 1.505 s] [INFO] Spark Project Networking ........................... SUCCESS [ 2.883 s] [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 1.516 s] [INFO] Spark Project Unsafe ............................... SUCCESS [ 7.137 s] [INFO] Spark Project Launcher ............................. SUCCESS [ 1.516 s] [INFO] Spark Project Core ................................. SUCCESS [01:55 min] [INFO] Spark Project ML Local Library ..................... SUCCESS [ 36.128 s] [INFO] Spark Project GraphX ............................... SUCCESS [ 30.925 s] [INFO] Spark Project Streaming ............................ SUCCESS [ 53.579 s] [INFO] Spark Project Catalyst ............................. SUCCESS [03:50 min] [INFO] Spark Project SQL .................................. SUCCESS [07:58 min] [INFO] Spark Project ML Library ........................... SUCCESS [02:42 min] [INFO] Spark Project Tools ................................ SUCCESS [ 13.733 s] [INFO] Spark Project Hive ................................. SUCCESS [04:52 min] [INFO] Spark Project REPL ................................. SUCCESS [ 34.085 s] [INFO] Spark Project Assembly ............................. SUCCESS [ 8.368 s] [INFO] Kafka 0.10+ Token Provider for Streaming ........... SUCCESS [01:06 min] [INFO] Spark Integration for Kafka 0.10 ................... SUCCESS [02:08 min] [INFO] Kafka 0.10+ Source for Structured Streaming ........ SUCCESS [01:24 min] [INFO] Spark Project Examples ............................. SUCCESS [01:01 min] [INFO] Spark Integration for Kafka 0.10 Assembly .......... SUCCESS [ 10.397 s] [INFO] Spark Avro ......................................... SUCCESS [01:12 min] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 31:51 min [INFO] Finished at: 2021-03-13T17:28:05+08:00 [INFO] ------------------------------------------------------------------------
从编译结果看Spark3项目结构还是变化挺大的,像Tags/Sketch/LocalDB/这些原来都没见过.具体每个子项目是做什么的,我们以后再一一介绍.接下来先运行下HelloWorld.试用Spark最简单的方式是使用scala shell:
tianlang@tianlang:spark$ ./bin/spark-shell 2021-03-14 08:51:53,351 WARN util.Utils: Your hostname, tianlang resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface wlp7s0) 2021-03-14 08:51:53,352 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2021-03-14 08:52:01,141 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://192.168.0.104:4040 Spark context available as 'sc' (master = local[*], app id = local-1615683123916). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.2.0-SNAPSHOT /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(1000*1000*1000).count(); res0: Long = 1000000000
spark.range(1000*1000*1000)是生成一个从0(包含)到10亿(不包含)的序列,可看成是一个只有一列的表t_range:
1 |
2 |
...... |
999999998 |
999999999 |
count()是计算下数据条数,有一条算一条总共是10亿条.相当于SQL的select count(*) from t_range. SQL的count函数容易跟SQL的sum函数弄混,sum是把所有的数据加起来也就是取序列中所有数值的和:0+1+2+3+......+999999998+9999999999;而count是统计下序列中的数据条数有一条算一条,不管具体数值是1还是999999998:1+1+1+......+1+1;
在SparkShell中不能直接调用sum对序列中的数值求和:
scala> spark.range(1000*1000*1000).sum(); <console>:24: error: value sum is not a member of org.apache.spark.sql.Dataset[Long] spark.range(1000*1000*1000).sum();
因为spark.range生成的Dataset对象没有sum函数.那怎么实现求和操作呢?
可以查下Spark API文档,看下Dataset有那些函数可以使用:
首先看到的是跟sum长的比较像的summary.看下它的介绍如果英文看不懂也没关系,可以看示例代码.发现它可以用来统计总条数/平均数/最大值/最小值等就是没有取和.看来不是名字像功能就一样啊.
如果先前接触过大数据,应该听过MapReduce.那应该就比较容易找到reduce函数:
defreduce(func: (T, T) ⇒ T): T
(Scala-specific) Reduces the elements of this Dataset using the specified binary function. The given
func
must be commutative and associative or the result may be non-deterministic.
用reduce取和:
scala> spark.range(1000*1000*1000).reduce((a,b) => a+b) <console>:24: error: overloaded method value reduce with alternatives: (func: org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long <and> (func: (java.lang.Long, java.lang.Long) => java.lang.Long)java.lang.Long cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long) spark.range(1000*1000*1000).reduce((a,b) => a+b)
为reduce提供一个取和的函数还报错了,从错误信息可以看出是数据类型不匹配问题,来个强制类型转换吧:
scala> spark.range(1000*1000*1000).reduce((a,b) => (a+b).asInstanceOf[java.lang.Long])
res11: Long = 499999999500000000
当然也可以使用其它的方式实现取和,比如:foreach,但执行方式跟reduce是有差别的,我们后面有机会再说.
大家应该也感觉到了,使用reduce函数远没有SQL中的sum函数方便.SQL中的函数用现在比较流行的词叫声明式的API,只需要关注我要什么就可以了,而不需要像reduce一样还要我关注怎么干.
这也是SQL经久不衰的一个原因吧.Spark也很早就提供了Spark SQL模块用于支持SQL语法.可以回头看下我们先前使用的Dataset就是sql包下的:
scala> spark.range(1000*1000*1000);
res2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
我们先前也说可以把range结果类比成一个只有一列的表,也不是随便说说的.还真的可以在上面执行SQL语句:
首先把Dataset注册为临时视图(也可以叫临时表,但注册临时表的API在2.0.0后就标记为废弃了):
scala> spark.range(1000*1000*1000).createOrReplaceTempView("t_range");
接下来就可以对视图t_range执行SQL了:
scala> spark.sql("select sum(id) from t_range");
res18: org.apache.spark.sql.DataFrame = [sum(id): bigint]scala> res18.collect
res19: Array[org.apache.spark.sql.Row] = Array([499999999500000000])
我是怎么知道列名称是id的?是通过printSchema函数.
scala> spark.range(1000*1000*1000).printSchema();
root
|-- id: long (nullable = false)
上面的代码都是使用的Scala,如果更倾向于使用Python.也可以使用./bin/pyspark.Spark3对Python的支持也提到了一个新高度.
HelloWorld就先到这里吧.蚂蚁啃骨头一点一点来.