Spark on Kubernetes原生支持浅析

作者简介:无咎,阿里云EMR技术专家。目前专注于大数据开发平台建设。

概述

Kubernetes自推出以来,以其完善的集群配额、均衡、故障恢复能力,成为开源容器管理平台中的佼佼者。从设计思路上,Spark以开放Cluster Manager为理念,Kubernetes则以多语言、容器调度为卖点,二者的结合是顺理成章的。

使用Kubernetes调度Spark的好处:

  • 集中式资源调度:接入k8s的Spark应用与其他k8s应用共享资源池。
  • 多租户:可利用Kubernetes的namespace和ResourceQuota做用户粒度的资源调度。
  • 容器生态:以监控为例,开发者可利用Prometheus检测Spark应用的性能。

Kubernetes社区早期尝试将Spark以standalone模式运行在Kubernetes上。SPARK-18278则提出了一个子项目apache-spark-on-k8s,旨在支持Spark driver/executor的Pod化。该项目于2018年正式合并到主版本,在Spark 2.3发布。

Spark on Kubernetes原生支持浅析

(图片来自databricks.com)

Spark 2.4的Kubernetes支持包含以下特性:

  • 支持pyspark应用
  • 支持R语言应用
  • 支持client mode:允许用户运行spark-shell或Notebook,可以是集群之外的单独机器,或者是k8s集群的pod。client mode要求用户保证driver与k8s集群内executor之间的连通性。如果driver运行在ks8集群的pod内,推荐使用headless service以允许executor通过FQDN连接到driver;如果driver运行在k8s集群之外,用户需要确保集群内的executor Pod可访问到driver。

kubernetes operator则是k8s上运行Spark另一种的途径,用户可以通过Helm chart安装。运行时转换为CRD对象执行。

以下我们以spark-2.4.3-bin-hadoop2.7为例,使用minikube实验Spark on Kubernetes。(minikube是Kubernetes官方工具,可运行单节点k8s服务,方便在本机上测试。)

在minikube中测试Spark on k8s

最低要求:

  • Spark 2.3+版本,client mode需要2.4+。
  • kubernetes 1.6以上版本,可调用kubectl。
  • Kubernetes启动DNS。

cluster mode

1. 启动minikube

运行Spark应用需要足够的系统资源,执行以下命令并重新启动minikube

minikube config set memory 4096
minikube config set cpus 4

或者显式指定资源启动minikube

minikube start --cpus 4 --memory 4096

2. 构建Spark镜像

为简便起见,直接在本地k8s环境中构建镜像,以便k8s运行时直接取用。注意:构建时必须位于Spark安装根目录下。

./bin/docker-image-tool.sh -r <repo> -t my-tag build

或者

eval $(minikube docker-env)
docker build -t spark:2.4.3 -f ./kubernetes/dockerfiles/spark/Dockerfile  .

也可以将构建镜像推送到k8s集群可达的镜像仓库中。

3. 执行examples

先为Spark应用配置必要的serviceAccount。

kubectl create serviceaccount spark
kubectl create clusterrolebinding spark-role --clusterrole=admin --serviceaccount=default:spark --namespace=default

master地址可以通过kubectl获取

kubectl cluster-info

本机执行spark-submit

./bin/spark-submit \
--master k8s://https://192.168.99.103:8443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=spark:2.4.3 \
local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar

启动控制台 minikube dashboard,在可以在界面上看到对应的pods

Spark on Kubernetes原生支持浅析

我们可以通过kubectl查看其运行日志。程序运行结束之后,driver Pod仍然保留。

kubectl -n=default logs -f spark-pi-4e673e6fc64432bb8bda3f5632ce9596-driver

client mode

以下演示在k8s集群的独立Pod中启动Spark应用和spark-shell的方法。

1. 准备独立Pod

用任意linux镜像运行。

kubectl run transfer1 -it -n default --image=centos:centos7 --serviceaccount='spark' -- /bin/sh

如果可以进入命令行,说明Pod已经启动,查看一下Pod列表。

kubectl get pods
NAME                                               READY   STATUS             RESTARTS   AGE
transfer1-584c678c8c-fh8s6                         1/1     Running            0          12m

进入transfer1 Pod,安装OpenJDK和Spark。

2. 暴露service

我们任意指定一个端口暴露,后续client mode将通过去DNS去查找Driver Pod的位置,这也是Spark on k8s要求DNS的原因。

kubectl expose deployment transfer1 --port=19987 --type=ClusterIP --cluster-ip=None
kubectl get services
NAME                                                   TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE
kubernetes                                             ClusterIP   10.96.0.1    <none>        443/TCP             41h
transfer1                                              ClusterIP   None         <none>        19987/TCP           13m

3. 在Pod中启动Spark应用

./bin/spark-submit \
--master k8s://https://192.168.99.103:8443 \
--deploy-mode client \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=spark:2.4.3 \
--conf spark.driver.host=transfer1.default.svc.cluster.local  \
--conf spark.driver.port=19987  \
/root/spark-2.4.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.3.jar

注意运行入口指向是本地的Pod的jar包。

启动spark-shell:

./bin/spark-shell  \
--master k8s://https://192.168.99.103:8443 \
--deploy-mode client  \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark  \
--conf spark.kubernetes.namespace=default  \
--conf spark.driver.pod.name=transfer1driverpod  \
--conf spark.executor.instances=2  \
--conf spark.kubernetes.container.image=spark:2.4.3 \
--conf spark.driver.host=transfer1.default.svc.cluster.local \
--conf spark.driver.port=19987

稍等片刻spark-shell即启动完成。

Pod创建原理

镜像

先了解Spark镜像做了什么。打开Spark发行目录下的kubernetes/dockerfiles/spark/Dockerfile,可以发现,Dockerfile只做了jars、bin、sbin等目录的文件拷贝,指向/opt/entrypoint.sh作为镜像入口。

entrypoint.sh支持传入"driver"或者"executor"参数(对于python和R支持,则是dirver-py和dirver-r),这样,默认容器即支持创建driver或者executor容器。无需用户显式提供spark.kubernetes.driver.container.image参数。

组件

  • Driver: 以headless service存在。
  • Executor: 数量可以由spark-submit参数指定,也支持动态资源配置。
  • k8s API Server: Spark通过API Server创建与删除Pod。

spark-submit之后

再看spark-submit,SparkSubmit类会匹配master参数,如果以"k8s"开头,则会装载对应的submit类,对于Spark 2.4,这个submit类是org.apache.spark.deploy.k8s.submit.KubernetesClientApplication

KubernetesClientApplication中创建driver Pod:

// resolvedDriverPod是基于KubernetesDriverBuilder,读取conf创建的Pod定义
kubernetesClient
        .pods()
        .withName(resolvedDriverPod.getMetadata.getName)
        .watch(watcher)) { _ =>
      val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
      try {
        val otherKubernetesResources =
          resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
        addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
        kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
      } catch {
           // 如果创建失败则移除driver Pod
        case NonFatal(e) =>
          kubernetesClient.pods().delete(createdDriverPod)
          throw e
      }

另一方面,与Spark on YARN的初始化过程类似,SparkContext装载ExternalClusterManager的子类KubernetesClusterManager,并初始化ExecutorPodsAllocator和k8s的DefaultKubernetesClient,初始化KubernetesClusterSchedulerBackend

KubernetesClusterSchedulerBackend继承了CoarseGrainedSchedulerBackend, 请求创建executor时,其重载的doRequestTotalExecutors方法,使用PodAllocator中的内部线程创建executor Pod,后者会自动增减executor数量;killExecutors时调用doKillExecutors来销毁Pods。

override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
    podAllocator.setTotalExpectedExecutors(requestedTotal)
    true
  }

...  

override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
    kubernetesClient
      .pods()
      .withLabel(SPARK_APP_ID_LABEL, applicationId())
      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
      .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
      .delete()
    // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
  }

总结

Kubernetes为Spark开发者提供了新的调度手段,Spark 2.4支持cluster/client mode运行,client mode可以运行在单独Pod或者k8s集群之外。

Spark on Kubernetes项目正在快速发展之中,目前支持的功能仍然是实验性质的,未来其内部实现可能会发生变化,对于Spark 3.0版本中的k8s新特性,我们不妨拭目以待。

[扩展阅读]

  1. Spark on k8s早期工作:https://issues.apache.org/jira/browse/SPARK-18278
  2. Lyft在SparkAI Summit上分享k8s实践: https://www.youtube.com/watch?v=PPtrY_XxYBE
  3. 关于Spark Operator的设计,参考:https://yq.aliyun.com/articles/695315,或者阅读官方文档:https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
上一篇:使用spark-redis组件访问云数据库Redis


下一篇:分布式快照算法: Chandy-Lamport