Apache Spark 2.x Machine Learning Cookbook(2) ---使用spark学习线性代数

在本章中,我们将介绍以下内容:
包导入和向量和矩阵的初始设置
使用Spark 2.0创建DenseVector并进行设置
使用Spark 2.0创建SparseVector并进行设置
使用Spark 2.0创建DenseMatrix并进行设置
在Spark 2.0中使用稀疏局部矩阵
使用Spark 2.0执行矢量算术
使用Spark 2.0执行矩阵算术
Spark 2.0 ML库中的分布式矩阵
在Spark 2.0中探索RowMatrix
在Spark 2.0中探索分布式IndexedRowMatrix
探索Spark 2.0中的分布式CoordinateMatrix
探索Spark 2.0中的分布式BlockMatrix

线性代数是机器学习(ML)和数学编程(MP)的基石。在处理Spark的机器库时,必须了解Scala提供的Vector / Matrix结构(默认情况下已导入)与Spark提供的Spark ML,MLlib Vector,Matrix功能不同。如果要立即使用Spark(即并行性)进行大规模矩阵/矢量计算(例如,具有更高数值精度的SVD实现替代方案),则由RDD支持的后者是所需的数据结构。在某些情况下用于衍生产品定价和风险分析)。 Scala向量/矩阵库提供了丰富的线性代数运算集,例如点积,加法运算等,它们在ML管道中仍然有自己的位置。总而言之,使用Scala Breeze和Spark或Spark ML的主要区别在于Spark功能由RDD支持,该功能允许同时进行分布式,并发计算和弹性,而无需任何额外的并发模块或额外的工作(例如,Akka +breeze)。

几乎所有的机器学习算法都使用某种形式的分类或回归机制(不一定是线性的)来训练模型,然后通过将训练输出与实际输出进行比较来最小化误差。 例如,Spark中推荐系统的任何实现都将严重依赖于矩阵分解,分解,近似或单值分解(SVD)。 处理大型数据集降维的另一个感兴趣的机器学习领域是主成分分析(PCA),它主要依赖于线性代数,因式分解和矩阵处理。

当我们在Spark 1.x.x中首次检查Spark ML和MLlib算法的源代码时,我们很快注意到Vectors和Matrices使用RDD作为许多重要算法的基础。

当我们重新查看Spark 2.0和机器学习库的源代码时,我们注意到一些有趣的更改,这些更改需要在以后进行考虑。 这是从Spark 1.6.2到Spark 2.0.0的此类更改的示例,该更改影响了我们使用Spark的一些线性代数代码:

在以前的版本(Spark 1.6.x)中,您可以转换DenseVector或SparseVector(请参阅https:/ / spark.apache.org/docs/ 1.5。2 / api \ / 通过使用toBreeze()函数直接访问https: //spark. apache. org/docs/1. 5. 2/api/java/org/apache/spark/mllib/linalg/Vectors.html),如以下代码库所示:

在Spark 2.0中,toBreeze()函数不仅已更改为asBreeze(),而且已将其降级为私有函数。

为了解决这个问题,请使用以下代码片段之一将前面的向量转换为常用的BreezeVector实例:

在Spark 2.0中,toBreeze()函数不仅已更改为asBreeze(),而且已将其降级为私有函数。
为了解决这个问题,请使用以下代码片段之一将前面的向量转换为常用的BreezeVector实例:

Scala是一种简洁的语言,面向对象和功能编程范例可以共存而不会发生冲突。 尽管在机器学习范式中,函数式编程是首选,但在以后的阶段使用面向对象的方法进行初始数据收集和表示并没有错。

就大规模分布式矩阵而言,我们的经验表明,在处理大型矩阵集10次9方至10次13方至10次27方等时,您必须更深入地研究分布式中固有的网络操作和混排操作。 根据我们的经验,当大规模操作时,局部和分布式矩阵/向量运算(例如,点积,乘法等)的组合效果最佳。

下图描述了可用的Spark向量和矩阵的分类:

spark vector and matrices:

              local vector  dense 

                                  sparse

                      matrix

             distributed

                         rowmatrix

                         indexrowmatrix

                         coordinatematrix

                         blockmatrix

包导入和向量和矩阵的初始设置

在进行Spark编程或使用矢量和矩阵工件之前,我们需要首先导入正确的程序包,然后设置SparkSession,以便获得对群集句柄的访问权限。在这个简短的食谱中,我们重点介绍了可以涵盖Spark中大多数线性代数运算的大量软件包。 随后的各个配方将包括特定程序所需的确切子集。

package chpater02

import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.rdd._
import org.apache.spark.mllib.linalg._
import breeze.linalg.{DenseVector => BreezeVector}
import Array._
import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.log4j.Logger
import org.apache.log4j.Level

object MyVectorMatrix {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    // setup SparkSession to use for interactions with Spark
    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName("myVectorMatrix")
      .config("spark.sql.warehouse.dir", ".")
      .getOrCreate()


    val xyz = Vectors.dense("2".toDouble, "3".toDouble, "4".toDouble)
    println(xyz)

    val CustomerFeatures1: Array[Double] = Array(1,3,5,7,9,1,3,2,4,5,6,1,2,5,3,7,4,3,4,1)
    val CustomerFeatures2: Array[Double] = Array(2,5,5,8,6,1,3,2,4,5,2,1,2,5,3,2,1,1,1,1)
    val ProductFeatures1: Array[Double]  = Array(0,1,1,0,1,1,1,0,0,1,1,1,1,0,1,2,0,1,1,0)

    val x = Vectors.dense(CustomerFeatures1)
    val y = Vectors.dense(CustomerFeatures2)
    val z = Vectors.dense(ProductFeatures1)

    val a = new BreezeVector(x.toArray)//x.asBreeze
    val b = new BreezeVector(y.toArray)//y.asBreeze
    val c = new BreezeVector(z.toArray)//z.asBreeze

    val NetCustPref = a+b
    val dotprod = c.dot(NetCustPref)

    println("Net Customer Preference calculated by Scala Vector operations = \n",NetCustPref)
    println("Customer Pref DOT Product calculated by Scala Vector operations =",dotprod)

    val a2=a.toDenseVector
    val b2=b.toDenseVector
    val c2=c.toDenseVector

    val NetCustPref2 = NetCustPref.toDenseVector
    println("Net Customer Pref converted back to Spark Dense Vactor =",NetCustPref2)

    val denseVec1 = Vectors.dense(5,0,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,8,9)
    val sparseVec1 = Vectors.sparse(20, Array(0,2,18,19), Array(5, 3, 8,9))

    println(denseVec1.size)
    println(denseVec1.numActives)
    println(denseVec1.numNonzeros)
    println("denceVec1 presentation = ",denseVec1)

    println(sparseVec1.size)
    println(sparseVec1.numActives)
    println(sparseVec1.numNonzeros)
    println("sparseVec1 presentation = ",sparseVec1)

    //println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    val ConvertedDenseVect : DenseVector= sparseVec1.toDense
    val ConvertedSparseVect : SparseVector= denseVec1.toSparse
    println("ConvertedDenseVect =", ConvertedDenseVect)
    println("ConvertedSparseVect =", ConvertedSparseVect)

    println("Sparse Vector Representation = ",sparseVec1)
    println("Converting Sparse Vector back to Dense Vector",sparseVec1.toDense)

    println("Dense Vector Representation = ",denseVec1)
    println("Converting Dense Vector to Sparse Vector",denseVec1.toSparse)

    // Spark Example
    // 23.0 34.3 21.3
    // 11.0 33.0 22.6
    // 17.0 24.5 22.2
    // will be Stored as 23.0, 11.0, 17.0, 34.3, 33.0, 24.5, 21.3,22.6,22.2

    val denseMat1 = Matrices.dense(3,3,Array(23.0, 11.0, 17.0, 34.3, 33.0, 24.5, 21.3,22.6,22.2))

    val MyArray1= Array(10.0, 11.0, 20.0, 30.3)
    val denseMat3 = Matrices.dense(2,2,MyArray1)

    println("denseMat1=",denseMat1)
    println("denseMat3=",denseMat3)

    val v1 = Vectors.dense(5,6,2,5)
    val v2 = Vectors.dense(8,7,6,7)
    val v3 = Vectors.dense(3,6,9,1)
    val v4 = Vectors.dense(7,4,9,2)

    val Mat11 = Matrices.dense(4,4,v1.toArray ++ v2.toArray ++ v3.toArray ++ v4.toArray)
    println("Mat11=\n", Mat11)

    println("Number of Columns=",denseMat1.numCols)
    println("Number of Rows=",denseMat1.numRows)
    println("Number of Active elements=",denseMat1.numActives)
    println("Number of Non Zero elements=",denseMat1.numNonzeros)
    println("denseMat1 representation of a dense matrix and its value=\n",denseMat1)

    val sparseMat1= Matrices.sparse(3,2 ,Array(0,1,3), Array(0,1,2), Array(11,22,33))
    println("Number of Columns=",sparseMat1.numCols)
    println("Number of Rows=",sparseMat1.numRows)
    println("Number of Active elements=",sparseMat1.numActives)
    println("Number of Non Zero elements=",sparseMat1.numNonzeros)
    println("sparseMat1 representation of a sparse matrix and its value=\n",sparseMat1)

    /*
    From Manual pages of Apache Spark to use as an example to Define Matrices.sparse()
    1.0 0.0 4.0
    0.0 3.0 5.0
    2.0 0.0 6.0
    [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], rowIndices=[0, 2, 1, 0, 1, 2], colPointers=[0, 2, 3, 6]
    */
    val sparseMat33= Matrices.sparse(3,3 ,Array(0, 2, 3, 6) ,Array(0, 2, 1, 0, 1, 2),Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
    println(sparseMat33)
    val denseFeatureVector= Vectors.dense(1,2,1)

    val result0 = sparseMat33.multiply(denseFeatureVector)
    println("SparseMat33 =", sparseMat33)
    println("denseFeatureVector =", denseFeatureVector)
    println("SparseMat33 * DenseFeatureVector =", result0)

    //println("*****************************************************************************")
    val denseVec13 = Vectors.dense(5,3,0)
    println("denseVec2 =", denseVec13)
    println("denseMat1 =", denseMat1)
    val result3= denseMat1.multiply(denseVec13)
    println("denseMat1 * denseVect13 =", result3)

    val transposedMat1= sparseMat1.transpose
    println("Original sparseMat1 =", sparseMat1)
    println("transposedMat1=",transposedMat1)

    val transposedMat2= denseMat1.transpose
    println("Original sparseMat1 =", denseMat1)
    println("transposedMat2=" ,transposedMat2)

    println("================================================================================")

    val denseMat33: DenseMatrix= new DenseMatrix(3, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0,7.0,8.0,9.0))
    val identityMat33: DenseMatrix = new DenseMatrix(3, 3, Array(1.0, 0.0, 0.0, 0.0,1.0,0.0,0.0,0.0,1.0))
    val result2 =denseMat33.multiply(identityMat33)
    println(result2)

    println(denseMat33.multiply(denseMat33)) // proof in action:  it is not symmetrical:  aTa not equal a

    println("denseMat33 =", denseMat33)
    println("Matrix transposed twice", denseMat33.transpose.transpose)
    println("denseMat33 =", denseMat33)

    /* Vector arithmetic */
    val w1 = Vectors.dense(1,2,3)
    val w2 = Vectors.dense(4,-5,6)
    val w3 = new BreezeVector(w1.toArray)//w1.asBreeze
    val w4=  new BreezeVector(w2.toArray)// w2.asBreeze
    println("w3 + w4 =",w3+w4)
    println("w3 - w4 =",w3+w4)
    println("w3 * w4 =",w3.dot(w4))
    val sv1 = Vectors.sparse(10, Array(0,2,9), Array(5, 3, 13))
    val sv2 = Vectors.dense(1,0,1,1,0,0,1,0,0,13)
    println("sv1 - Sparse Vector = ",sv1)
    println("sv2 - Dense  Vector = ",sv2)
    //    println("sv1  * sve2  =", sv1.asBreeze.dot(sv2.asBreeze))
    println("sv1  * sv2  =", new BreezeVector(sv1.toArray).dot(new BreezeVector(sv2.toArray)))


    // Matrix multipication
    val dMat1: DenseMatrix= new DenseMatrix(2, 2, Array(1.0, 3.0, 2.0, 4.0))
    val dMat2: DenseMatrix = new DenseMatrix(2, 2, Array(2.0,1.0,0.0,2.0))
    println("dMat1 =",dMat1)
    println("dMat2 =",dMat2)
    println("dMat1 * dMat2 =", dMat1.multiply(dMat2)) //A x B
    println("dMat2 * dMat1 =", dMat2.multiply(dMat1)) //B x A   not the same as A xB

    val m = new RowMatrix(spark.sparkContext.parallelize(Seq(Vectors.dense(4, 3), Vectors.dense(3, 2))))
    val svd = m.computeSVD(2, true)
    val v = svd.V
    val sInvArray = svd.s.toArray.toList.map(x => 1.0 / x).toArray
    val sInverse = new DenseMatrix(2, 2, Matrices.diag(Vectors.dense(sInvArray)).toArray)
    val uArray = svd.U.rows.collect.toList.map(_.toArray.toList).flatten.toArray
    val uTranspose = new DenseMatrix(2, 2, uArray) // already transposed because DenseMatrix has a column-major orientation
    val inverse = v.multiply(sInverse).multiply(uTranspose)
    // -1.9999999999998297  2.999999999999767
    // 2.9999999999997637   -3.9999999999996767
    println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    println(inverse)


    val dataVectors = Seq(
      Vectors.dense(0.0, 1.0, 0.0),
      Vectors.dense(3.0, 1.0, 5.0),
      Vectors.dense(0.0, 7.0, 0.0)
    )

    val identityVectors = Seq(
      Vectors.dense(1.0, 0.0, 0.0),
      Vectors.dense(0.0, 1.0, 0.0),
      Vectors.dense(0.0, 0.0, 1.0)
    )

    val dd = dataVectors.map(x => x.toArray).flatten.toArray
    dd.foreach(println(_))

    val dm00: Matrix = Matrices.dense(3, 3, dd)
    print("==============================")
    print("\n", dm00)

    val distMat33 = new RowMatrix(spark.sparkContext.parallelize(dataVectors))

    println("distMatt33 columns - Count =", distMat33.computeColumnSummaryStatistics().count)
    println("distMatt33 columns - Mean =", distMat33.computeColumnSummaryStatistics().mean)
    println("distMatt33 columns - Variance =", distMat33.computeColumnSummaryStatistics().variance)
    println("distMatt33 columns - CoVariance =", distMat33.computeCovariance())

    val distMatIdent33 = new RowMatrix(spark.sparkContext.parallelize(identityVectors))

    val flatArray = identityVectors.map(x => x.toArray).flatten.toArray
    dd.foreach(println(_))

    //flaten it so we can use it in Matrices.dense API call
    val dmIdentity: Matrix = Matrices.dense(3, 3, flatArray)

    val distMat44 = distMat33.multiply(dmIdentity)
    println("distMatt44 columns - Count =", distMat44.computeColumnSummaryStatistics().count)
    println("distMatt44 columns - Mean =", distMat44.computeColumnSummaryStatistics().mean)
    println("distMatt44 columns - Variance =", distMat44.computeColumnSummaryStatistics().variance)
    println("distMatt44 columns - CoVariance =", distMat44.computeCovariance())

    val distInxMat1 = spark.sparkContext.parallelize( List( IndexedRow( 0L, dataVectors(0)), IndexedRow( 1L, dataVectors(1)), IndexedRow( 1L, dataVectors(2))))

    println("distinct elements=", distInxMat1.distinct().count())

    val CoordinateEntries = Seq(
      MatrixEntry(1, 6, 300),
      MatrixEntry(3, 1, 5),
      MatrixEntry(1, 7, 10)
    )

    val distCordMat1 = new CoordinateMatrix(spark.sparkContext.parallelize(CoordinateEntries.toList))
    println("First Row (MarixEntry) =",distCordMat1.entries.first())

    val distBlkMat1 =  distCordMat1.toBlockMatrix().cache()
    distBlkMat1.validate()
    println("Is block empty =", distBlkMat1.blocks.isEmpty())

    spark.stop()
  }

}

 

上一篇:SQL中的rank(),dense_rank(),row_number()


下一篇:tf.layers.dense()