Spark集群的三种模式

文章目录

1、Spark的由来

  • 定义:Hadoop主要解决,海量数据的存储和海量数据的分析计算。Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

1.1 Hadoop的发展

  • Hadoop1.x存在的问题:
    • NameNode不能高可用
    • MR框架中资源调度和任务调度耦合
    • MR基于磁盘计算,效率低
  • Hadoop2.x对应的解决了以上几个问题
    • NameNode高可用
    • 将资源调度和任务调度解耦
    • 计算框架可插拔

Spark框架诞生早于Yarn,所以Spark自己设计了一套资源调度框架。

1.2 MapReduce与Spark对比

MR不适合迭代计算

Spark集群的三种模式

Spark支持迭代计算和图形计算:因为Spark中间结果不落盘。但是Shuffle也得落盘。

Spark集群的三种模式

2、Spark内置模块

Spark集群的三种模式

Spark Core:实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。

Spark SQL:是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的HQL来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等。

Spark Streaming:是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应。

Spark MLlib:提供常见的机器学习功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能。

Spark GraphX:主要用于图形并行计算和图挖掘系统的组件。

集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器

3、Spark运行模式

  • Local模式:本地调试

  • Standalone模式:Spark自带的任务调度模式

  • Yarn模式:使用Yarn进行资源调度和任务调度

3.1 Standalone模式部署

  • 集群规划
hadoop102 hadoop103 hadoop104
Spark Master、Worker Worker Worker

具体步骤:

  1. 解压安装包至指定目录

    tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/
    mv spark-3.0.0-bin-hadoop3.2/ spark-standalone
    
  2. 修改配置文件

    slaves

    hadoop102
    hadoop103
    hadoop104
    

    spark-env.sh

    SPARK_MASTER_HOST=hadoop102
    SPARK_MASTER_PORT=7077
    
  3. 分发spark-standalone

    xsync spark-standalone/
    
  4. 在hadoop102上启动集群

    sbin/start-all.sh
    
  5. jps查看启动情况

  6. 测试

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://hadoop102:7077 \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    

    参数含义

    参数 解释 可选值举例
    –class Spark程序中包含主函数的类
    –master Spark程序运行的模式 本地模式:local[*]、spark://hadoop102:7077、Yarn
    –executor-memory 1G 指定每个executor可用内存为1G 符合集群内存配置即可,具体情况具体分析。
    –total-executor-cores 2 指定所有executor使用的cpu核数为2个
    application-jar 打包好的应用jar,包含依赖。这个URL在集群中全局可见。 比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar
    application-arguments 传给main()方法的参数

配置历史服务器

  1. 修改配置文件

    spark-defaults.conf

    spark.eventLog.enabled          true
    spark.eventLog.dir              hdfs://hadoop102:8020/directory
    

    spark-env.sh

    export SPARK_HISTORY_OPTS="
    -Dspark.history.ui.port=18080 
    -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory 
    -Dspark.history.retainedApplications=30"
    

    # 参数1含义:WEBUI访问的端口号为18080

    # 参数2含义:指定历史服务器日志存储路径(读)

    # 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

  2. 重新分发修改的配置文件

    xsync spark-defaults.conf spark-env.sh
    
  3. 重启spark及其历史服务器

    sbin/stop-history-server.sh
    sbin/stop-all.sh
    
    sbin/start-all.sh
    sbin/start-history-server.sh
    
  4. 查看Spark历史服务器hadoop102:18080

配置高可用

  1. 停止集群

  2. 启动ZooKeeper

  3. 修改配置文件

    spark-env.sh

    #注释掉如下内容:
    #SPARK_MASTER_HOST=hadoop102
    #SPARK_MASTER_PORT=7077
    
    #添加上如下内容。配置由Zookeeper管理Master,在Zookeeper节点中自动创建/spark目录,用于管理:
    export SPARK_DAEMON_JAVA_OPTS="
    -Dspark.deploy.recoveryMode=ZOOKEEPER 
    -Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104 
    -Dspark.deploy.zookeeper.dir=/spark"
    
    #添加如下代码
    #Zookeeper3.5的AdminServer默认端口是8080,和Spark的WebUI冲突
    export SPARK_MASTER_WEBUI_PORT=8989
    
  4. 重新分发修改后的文件

    xsync spark-env.sh
    
  5. 重启集群

    sbin/start-all.sh
    sbin/start-history-server.sh
    
  6. 在hadoop103上启动master

    sbin/start-master.sh
    
  7. 通过hadoop103:8989访问测试

  8. 通过jps查看进程状态

  9. kill掉hadoop102的master进程测试

Spark HA集群访问

bin/spark-shell \
--master spark://hadoop102:7077,hadoop103:7077 \
--executor-memory 2g \
--total-executor-cores 2

参数:–master spark://hadoop102:7077指定要连接的集群的master

注:一旦配置了高可用以后,master后面要连接多个master

运行模式

根据Driver程序的运行位置分为如下两种模式

  • standalone-client(默认模式)

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://hadoop102:7077,hadoop103:7077 \
    --executor-memory 2G \
    --total-executor-cores 2 \
    --deploy-mode client \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    
  • standalone-cluster

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://hadoop102:7077,hadoop103:7077 \
    --executor-memory 2G \
    --total-executor-cores 2 \
    --deploy-mode cluster \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    

客户端模式的计算结果将打印在本地,集群模式只能在web页面中找到

Spark集群的三种模式

3.2 Yarn模式安装部署

  1. 解压安装包到指定位置

    tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/
    mv spark-3.0.0-bin-hadoop3.2/ spark-yarn
    
  2. 修改配置文件

    修改hadoop中的yarn-site.xml,分发并重启hadoop集群

    <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
    <property>
         <name>yarn.nodemanager.pmem-check-enabled</name>
         <value>false</value>
    </property>
    
    <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
    <property>
         <name>yarn.nodemanager.vmem-check-enabled</name>
         <value>false</value>
    </property>
    

    spark-env.sh

    # 修改/opt/module/spark-yarn/conf/spark-env.sh,添加YARN_CONF_DIR配置,保证后续运行任务的路径都变成集群路径
    YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
    
  3. 启动spark集群并测试

    sbin/start-history-server.sh
    sbin/start-all.sh
    
    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    

配置历史服务器

  1. 修改配置文件

    spark-defaults.conf

    #写
    spark.eventLog.enabled          true
    spark.eventLog.dir              hdfs://hadoop102:8020/directory
    
    #读
    spark.yarn.historyServer.address=hadoop102:18080
    spark.history.ui.port=18080
    

    spark-env.sh

    export SPARK_HISTORY_OPTS="
    -Dspark.history.ui.port=18080 
    -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory 
    -Dspark.history.retainedApplications=30"
    

    # 参数1含义:WEBUI访问的端口号为18080

    # 参数2含义:指定历史服务器日志存储路径(读)

    # 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

  2. 重新分发修改的配置文件

    xsync spark-defaults.conf spark-env.sh
    
  3. 重启spark及其历史服务器

    sbin/stop-history-server.sh
    sbin/stop-all.sh
    
    sbin/start-all.sh
    sbin/start-history-server.sh
    
  4. 提交任务到Yarn

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    
  5. Web页面查看日志:http://hadoop103:8088/cluster,点击history进入hadoop102:18080

运行模式

Spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster,适用于生产环境。

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

如果在yarn日志端无法查看到具体的日志,则在yarn-site.xml中添加如下配置并启动Yarn历史服务器

4、WordCount案例

  1. Maven依赖和scala打包插件

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>
    
    <build>
    	<finalName>WordCount</finalName>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                       <goals>
                          <goal>compile</goal>
                          <goal>testCompile</goal>
                       </goals>
                    </execution>
                 </executions>
            </plugin>
            
            
            <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
        </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
        </plugins>
    </build>
    
  2. 代码

    package com.atguigu.spark
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
    
        def main(args: Array[String]): Unit = {
    
            //1.创建SparkConf并设置App名称
            val conf = new SparkConf().setAppName("WC").setMaster("local[*]")
    
            //2.创建SparkContext,该对象是提交Spark App的入口
            val sc = new SparkContext(conf)
    
            //3.读取指定位置文件:hello atguigu atguigu
            val lineRdd: RDD[String] = sc.textFile("input")
    
            //4.读取的一行一行的数据分解成一个一个的单词(扁平化)(hello)(atguigu)(atguigu)
            val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
    
            //5. 将数据转换结构:(hello,1)(atguigu,1)(atguigu,1)
            val wordToOneRdd: RDD[(String, Int)] = wordRdd.map(word => (word, 1))
    
            //6.将转换结构后的数据进行聚合处理 atguigu:1、1 =》1+1  (atguigu,2)
            val wordToSumRdd: RDD[(String, Int)] = wordToOneRdd.reduceByKey((v1, v2) => v1 + v2)
    
            //7.将统计结果采集到控制台打印
            val wordToCountArray: Array[(String, Int)] = wordToSumRdd.collect()
            wordToCountArray.foreach(println)
    
            //一行搞定
            //sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).saveAsTextFile(args(1))
    
            //8.关闭连接
            sc.stop()
        }
    }
    
  3. log4j.properties

    log4j.rootCategory=ERROR, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
    
    # Set the default spark-shell log level to ERROR. When running the spark-shell, the
    # log level for this class is used to overwrite the root logger's log level, so that
    # the user can have different defaults for the shell and regular Spark apps.
    log4j.logger.org.apache.spark.repl.Main=ERROR
    
    # Settings to quiet third party logs that are too verbose
    log4j.logger.org.spark_project.jetty=ERROR
    log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
    log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
    log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
    log4j.logger.org.apache.parquet=ERROR
    log4j.logger.parquet=ERROR
    
    # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
    log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
    log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
    
上一篇:org.apache.ibatis.binding.BindingException: Invalid bound statement (not found)问题【后记】


下一篇:org.springframework.util.CollectionUtils