大数据篇--Spark调优

文章目录

一、算子的合理选择

pom.xml内容:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huiq</groupId>
    <artifactId>HuiqTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.4.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

1.map和mappartition:

  编写Scala程序模拟使用不同的算子将数据插入到数据中比较不同。

MapMappartitionApp:

package com.huiq.test

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

object MapMappartitionApp {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]")
      .setAppName("MapMappartitionApp")

    val sc = new SparkContext(sparkConf)

    val students = new ListBuffer[String]()
    for (i<-1 to 100) {
      students += "stu: " + i
    }
    val stuRdd = sc.parallelize(students)

    // 需要把students存储到数据库中去
    myMap(stuRdd)

    sc.stop()
  }

  def myMap(rdd: RDD[String]): Unit = {
    rdd.map(x => {
      val connection = DBUtils.getConnection()
      println(connection + "------------>")

      // TODO... 保存数据到数据库中

      DBUtils.returnConnection(connection)
    }).foreach(println)
  }
}

  运行程序我们会发现使用map算子有多少个元素就会创建多少个connection,这种性能肯定是不行的。
大数据篇--Spark调优
  换成mapPartition再运行程序:
大数据篇--Spark调优
总结map是对RDD中的每个元素作用上一个函数(你假设有个rdd有100个分区,每个分区里有1万个元素,你算一下整个过程会开启多少个connection?)。mapPartition是将函数作用到partition之上的(如果遇到要写数据到数据库,一定要选择该模式)。

思考如果分区数量比较少导致一个分区中的数据量很大,这种场景下用mapPartition可能会有资源不够导致类似OOM的问题,遇到这种问题的时候可以手动调整partition的数量来解决,比如上面的代码可以设置成10个分区val stuRdd = sc.parallelize(students, 10)

2.foreach和foreachpartition:

上一篇:21分钟 MySQL 入门教程


下一篇:【C++】比较器的实现,使用lambda