Spark机器学习MLlib系列1(for python)--数据类型,向量,分布式矩阵,API
关键词:Local vector,Labeled point,Local matrix,Distributed matrix,RowMatrix,IndexedRowMatrix,CoordinateMatrix,BlockMatrix。
前言:MLlib支持本地向量和存储在单机上的矩阵,当然也支持被存储为RDD的分布式矩阵。一个有监督的机器学习的例子在MLlib里面叫做标签点。
1. 本地向量
一个本地向量是由整数类型和从0开始的索引存储在单机上
。MLlib支持两种本地向量,稠密向量和稀疏向量。稠密向量由一个浮点数组表示它的的所有值,而一个稀疏矩阵由两个平行的数组组成,索引和值。举个例子,一个向量,(1.0,0.0,3.0)能个用稠密表现为[1.0,0.0,3.0] 或者以稀疏的形式表现为(3,[0,2],[1.0,3.0]),3是这个向量的大小。(本人注解:3为长度,即是元素个数,[0,2]为索引,[1.0,3.0],为值)
1.1MLlib认为如下数据类型是稠密向量:
~NumPys array
~Python list
1.2MLlib认为如下数据类型是稀疏向量:
~MLlib’s SparseVector.
~SciPy’s csc_matrix with a single colum
为了效率,我们推荐使用Numpy arrays ,并使用工厂方法继承Vectors 来创建稀疏矩阵。
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors
# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape = (3, 1))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
2.标签点
标签点可以是一个本地的向量,可以是稀疏的也可以是稠密的,总之他们是带有标签的。在MLlib中,标签点用来进行有监督的学习算法。我们使用双精度数来存储一个标签,这样我们既可以用标签点做分类,也可以用来做回归了。对于二分法,一个标签应该不是0就是1。对于多种分类,标签应该是索引从0,1,2,3….
一个标签点用LabelPoint来表示。
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
稀疏数据
在训练中,有一个稀疏训练数据是有一件很平常的事情。MLlib支持读取一个以LIBSVM格式存储训练例子。LIBSVM是LIBSVM和LIBLINEAR默认的格式。这是一种每一行带有一个标签的的稀疏向量格式如下:
label index1:value1 index2:value2 ...
- 1
索引从1开始的升序排列的。当读取完成之后,这些特征索引转化为从0开始。
MLUtils.loadLibSVMFile 读取存储LIBSVM格式的训练模型
from pyspark.mllib.util import MLUtils
examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
- 1
- 2
- 3
3.本地矩阵
一个本地矩阵有整数型的行,和双精度的列索引,并且存储在单机上。MLlib支持将所有数据存储在一个单独的数组上并且以列为顺序的稠密矩阵,也支持稀疏矩阵。举个例子,比如像下面的稠密矩阵:
这个矩阵是一个存储在一维数组 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0]上的大小为(3,2)的矩阵。
本地矩阵的基础类是Matrix,我们提供两个实现函数:DenseMatrix和SparseMatrix。我们推荐Matrices里面的工厂实现方法来创造本地矩阵。记住,MLlib的本地矩阵是列为序存储。
from pyspark.mllib.linalg import Matrix, Matrices
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
- 1
- 2
- 3
- 4
- 5
- 6
- 7
4.分布式矩阵
一个分布式矩阵有long类型的行和列,还有double类型的值,并且被分布式存储在一个或者更多的RDDs中。选择正确的格式来存储巨大的分布式矩阵是很重要的事情。将一个分布式矩阵转换可能需要一个全局的清洗,这是代价非常昂贵的。直到现在,四种类型的分布式矩阵已经实现了。
这四种中基础类型叫做 RowMatrix。这种 RowMatrix一种面向行的分布式矩阵,没有所谓的行索引。比如:一个特征向量的集合。它依赖与RDD自己的行,并且RDD的每个行是一个本地向量。对于一个RowMatrix我们假设列的数量不是非常巨大,以便一个单独的本地向量能够合理正确的与驱动交流并且能够存储操作在一个正在使用它的节点上。
IndexedRowMatrix与 RowMatrix除了能被用来区分行和执行合并的行索引不同之外,其他都非常相似。CoordinateMatrix是一种以coordinate list (COO) 格式存储在RDDs条目中的分布式矩阵。
BlockMatrix 是一种被RDDMatrixBlock支持的分布式矩阵,MatrixBlock是元祖(Int, Int, Matrix).
NOTE
潜在的分布式矩阵RDD必须是确定的,因为我们缓存了矩阵的大小,一般来说使用一个非确定性的RDD会导致错误。
RowMatrix
RowMatrix是一个面向行的分布式矩阵,没有所谓的行索引,可以使用RDD的行,这些行是本地向量。既然每个行都被本地向量表示,列的数目被整数范围限制,但是列数目在实际情况应该是比行小的多的。
from pyspark.mllib.linalg.distributed import RowMatrix
# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
IndexedRowMatrix
IndexedRowMatrix与 RowMatrix除了有意义的行索引外,其他都非常相似。它使用RDD索引行,以便每一行代表它的索引和本地向量。
一个 indexedRowMatrix可以被 indexedRowMatrix 创造出来,一个 indexedRowMatrix以能够被转化为RowMatrix通过去掉行索引。
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# Create an RDD of indexed rows.
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])
# Create an IndexedRowMatrix from an RDD of IndexedRows.
mat = IndexedRowMatrix(indexedRows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
# Get the rows as an RDD of IndexedRows.
rowsRDD = mat.rows
# Convert to a RowMatrix by dropping the row indices.
rowMat = mat.toRowMatrix()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
CoordinateMatrix
CoordinateMatrix是一个分布式矩阵,
并且由RDD的条目支持的。每一个条目就是一个元祖(i: Long, j: Long, value: Double),i是行索引,j 是列索引,value是条目值。CoordinateMatrix应该仅仅使用当矩阵规模特别大并且矩阵很稀疏的时候。
CoordinateMatrix 能够被MatrixEntry条目创建, CoordinateMatrix能被转化为 RowMatrix通过使用toRowMatrix,或者一个稀疏行IndexedRowMatrix通过使用 toIndexedRowMatrix.
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
# Create an RDD of coordinate entries.
# - This can be done explicitly with the MatrixEntry class:
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(6, 1, 3.7)])
# - or using (long, long, float) tuples:
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])
# Create an CoordinateMatrix from an RDD of MatrixEntries.
mat = CoordinateMatrix(entries)
# Get its size.
m = mat.numRows() # 3
n = mat.numCols() # 2
# Get the entries as an RDD of MatrixEntries.
entriesRDD = mat.entries
# Convert to a RowMatrix.
rowMat = mat.toRowMatrix()
# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
# Convert to a BlockMatrix.
blockMat = mat.toBlockMatrix()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
BlockMatrix
BlockMatrix是一个分布式矩阵,并且被 MatrixBlocks支持, MatrixBlocks是一个元祖, ((Int, Int), Matrix),(Int, Int)是块索引,Matrix是rowsPerBlock x colsPerBlock的形状。
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)
# Get its size.
m = mat.numRows() # 6
n = mat.numCols() # 2
# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks
# Convert to a LocalMatrix.
localMat = mat.toLocalMatrix()
# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()
原文转自 http://blog.csdn.net/qq_30115765/article/details/52594421