MaxCompute Spark 使用及常见问题|学习笔记

开发者学堂课程【SaaS 模式云数据仓库系列课程 —— 2021数仓必修课:MaxCompute Spark 使用及常见问题】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/55/detail/1047


MaxCompute Spark 使用及常见问题


目录

● MaxCompute Spark 介绍

● 开发环境搭建及常用配置

● 作业诊断

● 常见问题

 

一、MaxCompute Spark 介绍

MaxCompute Spark 是 MaxCompute 提供的兼容开源的 Spark 计算服务。

它在统一的计算资源和数据集权限体系之上,提供 Spark 计算框架,支持用户以熟悉的开发使用方式提交运行 Spark 作业,以满足更丰富的数据处理分析场景。

主要特性

■ 支持原生多版本 Spark 作业

■ 统一的计算资源

■ 统一的数据和权限管理

■ 与开源 Spark 相同的使用体验

适用场景∶

■离线计算场景∶GraphX、Mllib、RDD、Spark-SQL、PySpark

■ Streaming 场景

■读写 MaxCompute Table

■引用 MaxCompute Resource

■读写 VPC 环境下的服务,如 RDS、Redis、HBase、ECS 上部署的服务

■读写 OSS 非结构化存储使用限制

■不支持交互式类需求 Spark-Shell、Spark-SQL-Shell、PySpark-Shell 等

■不支持访问 MaxCompute 外部表,函数和 UDF

MaxCompute Spark 使用及常见问题|学习笔记

开发环境搭建

通过 Spark 客户端提交

(1)Yarn-Cluster 模式

下载 MC Spark 客户端

环境变量配置

√ JAVA HOME/SPARK_HOME/PATH

参数配置

√ Spark 社区版参数

√ MaxCompute 平台相关的参数

准备项目工程

√ Java/Scala∶ Jar 包

√ PySpark∶ Python 文件

任务提交

√./bin/spark-submit--master yarn-cluster

(2)Local 模式

./bin/spark-submit --master local[4]

IDEA调试

通过 Dataworks 提交

资源上传并提交

√ Java/Scala∶Jar包

√ PySpark∶ Python文件

√ File/Archive

参数配置

√主Jar+主类配置(Java/Scala)

√ 主Python文件( PySpark)

√ Spark参数配置(资源申请/平台相关/网络访问相关)

√参数配置(如果需要)

√选择文件/Archive/Jar/Python(如果需要)

任务提交并运行

√将资源和参数组合为 spark-submit 命令,本质上也是 Yarn-Cluster 模 式

 

二.开发环境搭建

2.1运行模式

● 通过Spark 客户端提交

Yarn-Cluster 模式,提交任务到 MaxCompute 集群中

Local模式

●通过 Dataworks 提交

本质上也是Yarn-Cluster模式,提交任务到 MaxCompute 集群中

2.2 通过客户端提交

2.2.1 Yarn-Cluster 模式

●下载 MC Spark 客户端

Spark 1.6.3

Spark 2.3.0

● 环境变量配置

参数配置

将 $SPARK_HOME/conf/spark-defaults.conf.template 重命名为 spark-defaults.conf

参数配置参考下文

●准备项目工程

1git clone https://github.com/aliyun/MaxCompute-Spark.git

2 cd spark-2.x

3 mvn clean package

●任务提交

2.2.2 Local 模式

●与Yarn Cluster 模式类似,用户首先需要做以上准备工作

●任务提交

IDEA 调试注意

IDEA 运行 Local 模式是不能直接引用 spark-defaults.conf 里的配置,需要手动在代码里指定相关配置

一定要注意需要在 IDEA 里手动添加 MaxCompute Spark 客户端的相关依赖(jars目录),否则会出现以下报错∶

the value of spark.sql.catalogimplementation should be one of hive in-memory but was odps

2.3 通过DataWorks提交

2.3.1资源上传

●本质上 MC Spark 节点的配置对应 于 spark-submit 命令的参数和选项

● 上传资源∶

0~50MB∶ 可以直接在 DataWorks 界面创建资源并上传

50MB~500MB∶ 可以先利用 MaxCompute 客户端(CMD)上传,然后在DataWorks 界面添加到数据开发,参考文档

● 资源引用∶

资源提交后,可以在 DataWorks Spark 节点界面选择需要的资源(jar/python/file/archive)

任务运行时∶资源文件默认会上传到 Driver 和 Executor 的当前工作目录

2.3.2 参数和配置

● Spark 配置项∶对应于 spark-submit 命令的 --conf 选项

>accessid,accesskey,projectname,endpoint,runtime.end.point,task.major.version 无需配置

>除此之外,需要将 spark-default.conf 中的配置逐条加到 dataworks 的配置项中

●给主类传参数(如 bizdate)

>首先在调度->参数中添加参数,然后在 Spark 节点"参数"栏引用该参数。多个参数用空格分隔

>该参数会传给用户主类,用户在代码中解析该参数即可

>参考文档

3.1配置的位置

3.1.1 Spark 配置的位置

●用户使用 Maxcompute Spark 通常会有几个位置可以添加 Spark 配置,主要包括∶

位置1∶spark-defaults.conf,用户通过客户端提交时在 spark-defaults.conf 文件中添加的 Spark 配置

位置2∶dataworks 的配置项,用户通过 dataworks 提交时在配置项添加的 Spark 配置,这部分配置最终会在位置3中被添加

位置3∶配置在启动脚本 spark-submit --conf 选项中

位置4∶ 配置在用户代码中,用户在初始化 SparkContext 时设置的 Spark 配置

●Spark配置的优先级

用户代码> spark-submit --选项> spark-defaults.conf配置>spark-env.sh配置>默认值

3.1.2 需要区分的两种配置

● 一种是必须要配置在 bspark-defaults.conf 或者 dataworks 的配置项中才能生效、而不能配置在用户代码中、

这类配置主要的特征是∶

与Maxcompute/Cupid平台相关∶ 一般参数名中都会带odps或者cupid,通常这些参数与任务提交/资源

 

申请都关系∶

●显而易见,一些资源获取(如driver的内存,core,diskdriver,maxcompute资源) 在任务执行之前就会用到,如果这些参数设置在代码中,很明显平台没有办法读到,所以这些参数一定不要配置在代码中.

 

三、配置介绍

配置的位置

■ spark-defaults.conf(通过Spark客户端提交)■ spark-submit --confi选项(通过Spark客户端提交)

■ Dataworks配置项(本质上是 spark-submit -conf)

■ 代码中初始化 SparkContext 时设置

配置的优先级

■ 用户代码>spark-submit--选项>spark-defaults.conf配置> spark-env.sh配置>默认值区分两种类型的参数

■ 必须要配置在 spark-defaults.conf 或者 Dataworks 的配置项中才能生效(任务启动之前需要)

√访问 VPC 相关的网络白名单配置

√ 资源申请相关的配置(包括 cores/memory/disk_size)

√ MaxCompute 的 project/ 账号 /endpoint/resource

√spark.master

■ 配置在代码中可以生效的(任务运行中才需要)√ Spark 相关的优化参数

■ 推荐把任务运行与优化相关的参数配置在代码中,与资源/平台相关的参数配置在spark-defaults.conf 或 Dataworks 配置项中

 

四、作业诊断

Logview

■ 链接位置∶

√在任务提交时会打印日志∶日志中含有 logview 链接 (关键字 logview url)

■ 主要信息:

√ StdErr 打印的是 spark 引擎输出的日志

√ StdOut 中打印用户作业输出到控制台的内容

Spark UI 和 Spark History UI

■ Spark UI

√ 任务运行时可用

■ Spark History UI

√任务结束后可用,渲染有一定延

4.1 Logview

4.1.1 Logview 介绍

· 在任务提交时会打印日志∶日志中含有 logview链接(关键字 logview url)

· Master 以及 Worker 的 StdErr 打印的是 spark 引擎输出的日志,StdOut 中打印用户作业输出到控制台的内容

 

4.1.2 利用 Logview 排查问题

● 拿到 Logview,一般首先看 Driver 的报错,briver 会包含一些关键性的错误

● 如果 Driver 中出现个中类或者方法找不到的问题,一般是 jar 包打包的问题

● 如果 Driver 中出现连接外部 VPC 或者 OSS 出现 Time out,这种情况一般要去排查一下参数配置

● 如果 Driver 中出现连接不到 Executor,或者找不到 Chunk 等错误,通常是Executor 已经提前退出,需要进一步查看 Executor 的报错,可能存在 OOM

1.根据 End Time 做排序,结束时间越早,越容易是发生问题的 Executor 节点

2.根据 Latency 做排序,Latency 代表了 Executor 的存活的时间,存活时间越短的,越有可能是根因所在

4.2 Spark Ul和HistoryServer

· Spark UI 与社区版一致;在 logivew 的 summary 模块下找到 Spark UIl 链接∶

· Spark UI 的使用与社区原生版是一致的,可以参考文档

· 注意

>Spark UI 需要鉴权,只有提交任务的 Owner 才能打开

>Spark UI 仅在作业运行时才能打开,如果任务已经结束,那么 Spark UI是无法打开的,这时候需要查看 Spark History Server Ul


五.常见问题

1.local 模式运行的问题

·问题—∶the value of spark.sql.catalogimplementation should be one of hive in-memory but was odps

>原因在于用户没有正确地按照文档将 Maxcompute Spark 的 jars 目录添加到类路径,导致加载了社区版的 spark 包,需要按照文档将 jars 目录添加到类路径

· 问题二∶ IDEA Local 模式是不能直接引用 spark-defaults.conf 里的配置,必须要把 Spark 配置项写在代码中

问题三∶访问 OSS 和 VPC∶

> Local 模式是处于用户本机环境,网络没有隔离。而Yarn-Cluster 模式是处于Maxcompute 的网络隔离环境中、必须要要配置vpc访问的相关参数

> Local 模式下访问 oss 的 endpoint 通常是外网 endpoint,而 Yarn-cluster 模式下访问 vpc 的 endpoint 是经典网络 endpoint

2.jar 包打包的问题

· java/scala 程序经常会遇到 Java 类找不到/类冲突问题∶

>类冲突∶用户 Jar 包与 Spark 或平台依赖的 Jar 包冲突

>类没有找到∶用户 Jar 包没有打成 Fat Jar 或者由于类冲突引起

 

● 打包是需要注意∶

> 依赖为 provided 和 cornpile 的区别∶

■ provided∶代码依赖该 jar 包,但是只在编译的时候需要用,而运行时不需要,运行时会去集群中去

寻找的相应的 jar 包

■ compile∶代码依赖该 jar 包,在编译、运行时候都需要,在集群中不存在这些 jar包,需要用户打到自己的jar包中。这种类型的 jar 包一般是一些三方库,且与 spark 运行无关,与用户代码逻辑有关

> 用户提交的 jar 包必须是 Fat jar∶

■ 必须要把 compile 类型的依赖都打到用户 jar 包中,保证代码运行时能加载到这些依赖的类

● 需要设置为 provided 的 jar 包

> groupld为org.apache.spark的Jar包

>平台相关的Jar包

■ cupid-sdk

■hadoop-yarn-client

■ odpS-sdk

 

需要设置为 compile 的 jar 包

>oss 相关的 jar 包

■ hadoop-fs-oss

>用户访问其他服务用到的 jar 包∶

■ 如 mysql, hbase

> 用户代码需要引用的第三方库

 

3.需要引入 Python 包

●很多时候用户需要用到外部 Python 依赖

,首先推荐用户使用我们打包的公共资源,包含了常用的一些数据处理,计算,以及连接外部服务(mysql,redis,hbase)的三方库

> 如果不能满足用户需要,用户可以在该公共资源的基础上上传 wheel 包

>如果 wheel 包依赖链较为复杂,可以通过 Docker 容器进行打包

●使用 Docker 容器打包∶

>为了保证与线上环境一致,避免运行时 so 包找不到的问题,需要使用 Docker 容器进行打包

>Docker 容器本质只是提供了兼容性较好的 os 环境,用户需要在容器中进行打包,并将整个 Python 目录压

缩后上传到 MaxCompute Resource 中,最后在 Spark 任务中直接引用即可

>参见文档

4.需要引入外部文件

● 需要引用到外部文件的场景

>用户作业需要读取一些配置文件

>用户作业需要额外的 jar 包/Python 库

●可以通过两种方式上传资源∶

>通过 Spark 参数上传文件

> 通过 MaxCompute Resource 上传文件

● 通过 Spark 参数上传文件

> MaxCompute Spark 支持 Spark 社区版原生的 --jars,--py-files 等参数,可以在作业提交时通过这些参数将文件上传,这些文件在任务运行时会被上传到用户的工作目录下

> 通过 DataWorks 添加任务需要的资源,参见上文

●MaxCompute Resource

> spark.hadoop.odps.cupid.resources 参数,可以直接引用 MaxCompute 中的资源,这些资源在任务运行时也会被上传到用户的工作目录下

>使用方式

(1)通过 MaxCompute 客户端将文件上传(单个文件最大支持500MB)

(2)在 Spark 作业配置中添加 spark.hadoop.odps.cupid.resources 参数∶格式为. ,如果需要引用多个文件,需要用逗号隔开

(3)如果需要重命名,格式为,∶

●如何读取上传的文件∶

>如果需要读取上传的文件资源

>或者直接通过类加载器获取文件路径,然后再读取

>参者文档

5.VPC 访问的问题

●Maxcompute Spark 是独立运行在 Maxcompute 集群的,网络与外界隔离,因此无法直接访问 vpc 和公网,需要添加以下配置。

●北京和上海 Region 使用 smartnat

>需要配置

■ spark.hadoop.odps.cupid.vpc.domain.list

■ spark.hadoop.odps.cupid.smartnat.enable=true

>访问公网∶假如要访问google.com∶443,需要做以下两步∶

■ 提工单设置 project 级别白名单,把 google.com∶443 加到odps.security.outbound.internetlist

■ 配置作业级别的公网访问白名单∶spark.hadoop.odps.cupid.internet.access.list=google.com∶443

● 其他Region∶

>只需要配置spark.hadoop.odps.cupid.vpc.domain.list

>无法访问公网

 

● 注意事项∶

>vpc.domain.list 需要压缩成一行,不能包含空格

>支持同时访问同一个 Region 下的多个 VPC,需要配置所有要访问的 ip∶port 的白名单

>需要在要访问的服务中添加 ip 白名单,允许 100.104.0.0/16 网段的访问

>用户要保证所有可能访问到的 IP 都已经加到 vpc.domain.list,例如如果用户要访问位于 hdfs,hbase 这种多个节点的服务,一定要把所有的节点都添加进来,不然可能会遇到 Time out

 

6.OOM 的问题

● 可能出现 OOM 的情况∶

> 错误1∶在某些 Executor 中出现 Cannot allocate memory,一般是系统内存不足,此时可以调整

spark.yarn.executor.memoryOverhead 参数,注意该参数是会计算到总内存数的,也不需要一次性增加太多,小心调整即可

>错误2∶Executor抛出java.lang.OutOfMemoryError∶ Java heap space

>错误3∶ GC overhead limit exceeded

> 错误4∶ No route to host∶workerd********/Could not find CoarseGrainedScheduler,这类错误一般是一些 Executor 提前退出。如果一个task 处理的数据非常大,容易发生 OOM

● Driver OOM∶Driver OOM 的可能性比较小,但是也是有可能出现的

>如果需要使用 collect 算子将 RDD 的数据全部拉取到 Driver 上进行处理,那么必须确保 Driver 的内存足够大,否则会出现 OOM 内存溢出的问题。

SparkContext,DAGScheduler 都是运行在 Driver 端的。Stage 切分也是在Driver端运行,如果用户程序有过多的步骤,切分出过多的Stage.这部分信息消耗的是Driver的内存.这个时候就需要调大Driver的内存。有时候如果stage过 多,Driver端可能会有栈溢出的问题

 

●一些解决方法∶

>限 制 executor 并行度,将 cores 调小∶多个同时运行的 Task 会共享一个Executor 的内存,使得单个 Task 可使用的内存减少,调小并行度能缓解内存压力

>增加单个Executor 内存

>增加分区数量,减少每个 exeCutor 负载

>考虑数据倾斜问题,因为数据倾斜导致某个 task 内存不足,其它 task 内存足够

 

7. No space left on device

●这个错误意味这本地磁盘不足,通常这个报错会在 executor 上出现,并导致executor 挂掉

● 解决方案

>直接增加更多的磁盘空间∶默认 driver 和 executor 都各提供 20g 的本地磁盘,当磁盘空间不足时可以调整 spark.hadoop.odps.cupid.disk.driver.device_size

>如果调整本地磁盘大小到 100g 后,仍然报该错误,说明单个 executor 写的shuffle 数据已经超过上限,可能是遇到了数据倾斜,这种情况下可以对数据重分区。或者增加 executor 的数量

8.申请资源的问题

●申请不到资源的几种现象∶

(1)在 driver 端一般会打以下日志

>WARN YarnClusterScheduler: Initial job has not accepted any resources;check your cluster UI to ensure that workers are registered and have sufficient resources

(2)在 logview 中只能看到 driver,而 worker 数量为0

(3)在 spark ui 中只能看到 driver,而 worker 数量为0

●解决方案∶

>调整任务资源∶调整用户申请的 executor 总数或者单个 executor 的资源数量(一般是内存),如果单个executor请求的内存过多可能不太容易申请到>合理安排任务执行时间

● 其他注意事项∶

>必须配置 spark.master=yarn-cluster 才会正确的申请资源

9.其他问题

●如何切换 Spark 版本

>版本号规则介绍∶示例 spark-2.3.0-odps0.32.5

■ spark-2.3.0 是社区版本的 spark 版本号,Maxcompute Spark 基于该社区版本进行适配

■ odps0.32.5 是 Maxcompute Spark 的小版本号,随着小版本号的升级,可能进行一些 bug 修复和 sdk 的升级

> 用户提交作业的的 Spark 版本可能有以下几种情况∶

■ 情况1∶直接通过本地客户端提交任务,spark 版本就是用户本地客户端的版本

■ 情况2∶用户通过 dataworks 提交任务,取决于 dataworks gateway 的默认 spark版本,当前公共云 dataworks 公共资源组 gateway 的默认版本是spark-2.3.0-odps0.32.1

■情况 3 ∶用户通过 dataworks 提交任务,配置参数spark.hadoop.odps.spark.version,则会按照配置的版本号来寻找对应的 spark 客户端,用户可以配置 spark.hadoop.odps.spark.version=spark-2.3.0-odps0.32.5 手动切换版本

■ 情况4∶该情况优先级最高,用户可以在本地客户端或者是 dataworks 提交任务时配置以下参数,则类加载的优先级最高,因此会在 spark 任务启动时优先使用该版本的spark

spark.hadoop.odps.cupid.resources = public.__spark_libs__2.3.0odps0.32.5.zip

spark.driver.extraClassPath=./public.spark_libs_2.3.0odps0.32.5.zip/*/34:52

● 需要在代码中访问配置项∶

>spark 开头的参数直接通过 SparkConf 类提供的接口直接读取即可

Server 渲染速度慢

可以添加压缩配置∶ spark.eventLog.compress=true

●如何正确地 Kill 一个运行中的 Spark 任务

通常通过两种方式kil正在运行的 Spark 任务

(1)通过 odps cmd 执行 kill + instanceld;

(2)通过 dataworks 界面执行 stop

注意,直接在 spark 客户端或者 dataworks 的任务提交界面执行 Ctrl+ C 是无法 kill一个 S park 任 务的

●日志中文乱码,添加以下配置

1. spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8-Dsun.jnu.encoding=UTF-8

2.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8-Dsun.jnu.encoding=UTF-8

3.如果是 pyspark 作业需要设置下如下两个参数∶

​​ spark.yarn.appMasterEnv.PYTHONIOENCODING=utf8

spark.executorEnv.PYTHONIOENCODING=utf8

■ 另外在 python 脚本的最前面加上如下的代码∶


六.相关文档

● MC Spark github wiki: https:/github.com/aliyun/MaxCompute-Spark/wiki

● Spark UI: https://spark.apache.org/docs/latest/web-ui.html

● Spark 配置∶https∶//sparkapache.org/docs/2.4.5/configuration.html

上一篇:如何在ACK中进行自动化应用灰度发布


下一篇:Nest开源Thread协议