Spark 教程

Spark SQL

Spark 笔记

Spark MLlib

Spark MLlib 数据类型


Spark MLlib 提供了一系列基本数据类型以支持底层的机器学习算法。主要的数据类型包括,本地向量(Local Vector)、标注点(Labeled Point)、本地矩阵(Local Matrix)、分布式矩阵(Distributed Matrix)等。

单机模式存储的本地向量与本地矩阵,以及基于一个或多个 RDD 的分布式矩阵。其中本地向量与本地矩阵作为公共接口提供简单数据模型,底层的线性代数操作由 Breeze 库和 BLAS 库提供。标注点类型用来表示监督学习(Supervised Learning)中的一个训练样本。

本地向量(Local Vector)

本地向量存储在单机上,其拥有整型、从0开始的索引值以及浮点型的元素值。

MLlib 提供了两种类型的本地向量,稠密向量(DenseVector)和稀疏向量(SparseVector)。

稠密向量使用一个双精度浮点型数组来表示其中每一维元素,例如,向量(1.0, 0.0, 3.0)的稠密向量表示形式是[1.0,0.0,3.0]

稀疏向量则是基于一个整型索引数组和一个双精度浮点型的值数组。例如,向量(1.0, 0.0, 3.0)的稀疏向量形式则是(3, [0,2], [1.0, 3.0]),其中,3是向量的长度,[0,2]是向量中非0维度的索引值,表示位置为0、2的两个元素为非零值,而[1.0, 3.0]则是按索引排列的数组元素值。

所有本地向量都以 org.apache.spark.mllib.linalg.Vector 为基类,DenseVector 和 SparseVector 分别是它的两个实现类,创建本地向量的创建方式主要有如下三种方式,也可以用 Vectors 工具类。

import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors

object LocalVectorDemo {

  def main(args: Array[String]) {

    //  创建一个稠密向量
    val dv: linalg.Vector = Vectors.dense(1.0, 0.0, 3.0)

    //  创建一个稀疏本地向量
    //  方法第二个参数数组指定了非零元素的索引,而第三个参数数组则给定了非零元素值
    val sv1: linalg.Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))

    //  另一种创建稀疏本地向量的方法
    //  方法的第二个参数是一个序列,其中每个元素都是一个非零值的元组:(index,elem)
    val sv2: linalg.Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

    println(dv, dv.getClass.getCanonicalName)
    println(sv1, sv1.getClass.getCanonicalName)
    println(sv2, sv2.getClass.getCanonicalName)
  }

}

输出如下:

([1.0,0.0,3.0],org.apache.spark.mllib.linalg.DenseVector)
((3,[0,2],[1.0,3.0]),org.apache.spark.mllib.linalg.SparseVector)
((3,[0,2],[1.0,3.0]),org.apache.spark.mllib.linalg.SparseVector)

标注点(Labeled Point)

标注点(LabeledPoint)是一种带有标签(Label/Response)的本地向量,它可以是稠密或者稀疏的。在 MLlib 中,标注点在监督学习算法中被使用。由于标签是用双精度浮点型来存储的,故标注点类型在回归(Regression)分类(Classification)问题上均可使用。例如,对于二分类问题,则正样本的标签为1负样本的标签为0,而对于多类别的分类问题来说,标签则应是一个以0开始的索引序列:0, 1, 2 ...

标注点的实现类是 org.apache.spark.mllib.regression.LabeledPoint,需要注意的是,它与前面介绍的本地向量不同,并不位于 linalg 包下。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

object LabeledPointDemo {

  def main(args: Array[String]): Unit = {

    // 创建一个标签为1,数据为密集型向量构成的带标签点
    val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

    // 创建一个标签为0,数据为稀疏型向量构成的带标签点
    val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

    println(pos, pos.getClass.getCanonicalName)
    println(neg, neg.getClass.getCanonicalName)

  }
}

输出如下:

((1.0,[1.0,0.0,3.0]),org.apache.spark.mllib.regression.LabeledPoint)
((0.0,(3,[0,2],[1.0,3.0])),org.apache.spark.mllib.regression.LabeledPoint)

稀疏数据

在实际生产中训练数据是稀疏数据很常见。MLlib 支持以 libsvm 格式存储的稀疏矩阵。这是一个 txt 格式文件,其中每一行代表一个打标签的稀疏特征向量,格式如下:

label index1:value1 index2:value2 ...

其中,索引是从1开始的,递增的顺序。加载之后,特征索引就转化为了从0开始的。

MLlib 在 org.apache.spark.mllib.util.MLUtils 工具类中提供了读取 LIBSVM 格式的方法 loadLibSVMFile,其使用非常方便。

本地矩阵(Local Matrix)

本地矩阵具有整型的行、列索引值和双精度浮点型的元素值,它存储在单机上。MLlib 支持稠密矩阵 DenseMatrix 和稀疏矩阵 Sparse Matrix 两种本地矩阵,稠密矩阵将所有元素的值存储在一个列优先(Column-major)的双精度型数组中,而稀疏矩阵则将非零元素以列优先的 CSC(Compressed Sparse Column)模式进行存储。

本地矩阵的基类是 org.apache.spark.mllib.linalg.MatrixDenseMatrix 和 SparseMatrix 均是它的实现类,和本地向量类似,MLlib 也为本地矩阵提供了相应的工具类 Matrices

import org.apache.spark.mllib.linalg.{Matrices, Matrix}

object LocalMatrixDemo {

  def main(args: Array[String]): Unit = {

    //  创建一个3行2列的稠密矩阵[ [1.0,2.0], [3.0,4.0], [5.0,6.0] ]
    //  请注意,这里的数组参数是列先序的!
    val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

    //  创建一个3行2列的稀疏矩阵[ [9.0,0.0], [0.0,8.0], [0.0,6.0]]
    //  第一个数组参数表示列指针,即每一列元素的开始索引值
    //  第二个数组参数表示行索引,即对应的元素是属于哪一行
    //  第三个数组即是按列先序排列的所有非零元素,通过列指针和行索引即可判断每个元素所在的位置
    val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

    println(dm)
    println("--------")
    println(sm)
  }

}

输出如下:

1.0  2.0  
3.0  4.0  
5.0  6.0  
--------
3 x 2 CSCMatrix
(0,0) 9.0
(2,1) 6.0
(1,1) 8.0

分布式矩阵(Distributed Matrix)

MLlib 在 Vector 和 Matrix 基础上,实现了分布式矩阵类,分布式矩阵的数据分块或者分行存储,并且实现矩阵的基本运算,能够使矩阵分布式计算,比如列统计、相似度、协方差、奇异值分解等。

分布式矩阵由长整型的行列索引值和双精度浮点型的元素值组成。它可以分布式地存储在一个或多个 RDD 上,MLlib 提供了四种分布式矩阵的存储方案:行矩阵(RowMatrix),索引行矩阵(IndexedRowMatrix)、坐标矩阵(CoordinateMatrix)和分块矩阵(BlockMatrix)。它们都属于 org.apache.spark.mllib.linalg.distributed 包。

行矩阵(RowMatrix)

分布式行矩阵就是把每行对应一个 RDD, 将矩阵的每行分布式存储,和矩阵的每行是一个本地向量。这和多变量统计的数据矩阵比较相似。因为每行以一个本地向量表示,所以矩阵列的数量被限制在整数范围内,但是实际应用中列数很小。

索引行矩阵(IndexedRowMatrix)

IndexedRowMatrix 和 RowMatrix 非常相似,区别是它带有有一定意义的行索引。在 RowMatrix 中,rows 的格式是 RDD[Vector];而在 IndexedRowMatrix 中,rows 的格式是 RDD[IndexedRow],其中 IndexedRow 的格式是(index: Long, vector: Vector,相比 RowMatrix 多了一个 index 索引信息。一个 IndexedRowMatrix 可以从 RDD[IndexedRow] 实例创建,IndexedRow 是(Int,Vecton)的 wrapper,而且这种矩阵可以转换成 RowMatrix (通过去掉 index) ,其创建及使用方法类似于 RowMatrix。

坐标矩阵(Coordinate Matrix)

坐标矩阵,每一项都是一个(i: Long,j: Long, value: Double) 指示行列值的元组 tuple。其中 i 是行坐标,j 是列坐标,value 是值。如果矩阵是非常大的而且稀疏,则坐标矩阵一定是最好的选择。坐标矩阵是通过 RDD[MatrixEntry]实例创建的,MatrixEntry 为(long, long.Double)形式。坐标矩阵可以转化为 IndexedRowMatrix。

分块矩阵(Block Matrix)

在数学的矩阵理论中,一个分块矩阵或是分段矩阵就是将矩阵分割出较小的矩形矩阵,这些较小的矩阵就被称为区块。换个方式来说,就是以较小的矩阵组合成一个矩阵。分块矩阵的分割原则是以水平线和垂直线进行划分。在分块矩阵中,位于同一行(列) 的每一个子矩阵,都拥有相同的列数(行数)。

通过将大的矩阵通过分块的方式划分,并将每个分块看作另一个矩阵的元素,这样之后再参与运算,通常可以让计算变得清晰甚至得以大幅简化。例如,有的大矩阵可以通过分块变为对角矩阵或者是三角和矩阵等特殊形式的矩阵。