示例一:spark自带示例项目SparkPi:计算Pi
本文以Spark自带示例项目计算Pi为例测试当前EMR Spark环境是否可用,示例详情请参见EMR示例项目使用说明。
准备工作:
获取spark自带example的jar包spark-examples_2.11-2.4.5.jar存放路径,spark组件安装在/usr/lib/spark-current路径下,登录EMR集群可查询全路径/usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,详情可参见EMR常用文件路径。
执行任务:
新建EMR Spark节点,提交运行代码。仅需填写spark-submit后面部分的内容,作业提交会自动补全。
提交代码:
--class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
实际执行:
# spark-submit [options] --class [MainClass] xxx.jar args spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
查看结果:
返回结果1097: Pi is roughly 3.1415547141554714,运行成功,环境可用。
示例二:Spark对接MaxCompute
以Spark对接MaxCompute为例,实现通过Spark统计MaxCompute表行数。更多应用场景可见EMR Spark开发指南。
本示例涉及云产品:绑定EMR引擎和MaxCompute引擎的DataWorks项目、OSS。
准备测试数据:
在DataWorks数据开发新建odps sql节点,执行建表和插入数据语句,第一列为bigint类型,插入2条记录。
DROP TABLE IF EXISTS emr_spark_read_odpstable ; CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable ( id BIGINT ,name STRING ) ; INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;
本地开发:
创建Maven工程,添加pom依赖,详情请参见Spark准备工作。
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-maxcompute_2.11</artifactId> <version>1.9.0</version> </dependency>
插件部分仅供参考。
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <configuration> <recompileMode>incremental</recompileMode> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> </plugins> </build>
编写内容:
实现在Spark对MaxCompute表第一列Bigint类型行数统计,详情请参见Spark对接MaxCompute。完成后打jar包,有关odps的依赖都属于第三方包,所以也需要一起打包上传到集群。
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.aliyun.emr.example.spark import com.aliyun.odps.TableSchema import com.aliyun.odps.data.Record import org.apache.spark.aliyun.odps.OdpsOps import org.apache.spark.{SparkConf, SparkContext} object SparkMaxComputeDemo { def main(args: Array[String]): Unit = { if (args.length < 6) { System.err.println( """Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions> | |Arguments: | | accessKeyId Aliyun Access Key ID. | accessKeySecret Aliyun Key Secret. | envType 0 or 1 | 0: Public environment. | 1: Aliyun internal environment, i.e. Aliyun ECS etc. | project Aliyun ODPS project | table Aliyun ODPS table | numPartitions the number of RDD partitions """.stripMargin) System.exit(1) } val accessKeyId = args(0) val accessKeySecret = args(1) val envType = args(2).toInt val project = args(3) val table = args(4) val numPartitions = args(5).toInt val urls = Seq( Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment ) val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)") val sc = new SparkContext(conf) val odpsOps = envType match { case 0 => OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1)) case 1 => OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1)) } val odpsData = odpsOps.readTable(project, table, read, numPartitions) println(s"Count (odpsData): ${odpsData.count()}") } def read(record: Record, schema: TableSchema): Long = { record.getBigint(0) } }
上传运行资源:
登录OSS控制台,在指定路径下上传jar资源(首次使用需要一键授权,详情请参见emr mr节点中的一键授权)。
本示例在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下上传emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar。
注:由于DataWorks EMR 资源上限是50M,而带依赖的包通常大于50m,所以直接在OSS控制台上传。如果您的资源小于50M也可以在DataWorks上操作创建和使用EMR JAR资源。
创建EMR JAR资源:
本示例创建emr_spark_demo-1.0-SNAPSHOT.jar资源,上传上文打好的jar包,存储在oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/路径下(首次使用需要一键授权),提交资源,提交后可前往OSS管控台查看。详情请参见创建和使用EMR JAR资源。
创建并执行EMR Spark节点:
本示例在业务流程的EMR数据开发模块下右键新建EMR Spark节点命名为emr_spark_odps,选择EMR引擎实例,提交如下代码,点击高级运行。
其中参数信息Arguments 需要替换为实际使用的相关信息。
提交代码:
--class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1
查看结果:
查看日志,表记录数为2符合预期。