searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

sparkMLib源码学习笔记

2023-08-01 06:06:44
8
0

一、mllib/src/.../spark/ml/

ML基于DataFrame,而MLlib API基于RDD(https://zhuanlan.zhihu.com/p/161414849

ML提供了一套跨语言以及跨机器学习算法的API

1.1 ann文件夹结构

BreezeUtil: 两个矩阵计算方法

Layer:

维护权重大小、输出大小

createModel、initModel

LayerModel:

权重矩阵

eval、computePrevDelta、grad

AffineLayer extends Layer、AffineLayerModel伴生对象

AffineLayerModel extends LayerModel

FunctionalLayer extends LayerModel

FunctionalLayerModel extends LayerModel

ActivationFunction

eval、derivative

SigmoidFunction extends ActivationFunction

Topology

model

TopologyModel

weights、layers、layerModels

forward、predict、predictRaw、raw2ProbabilityInPlace、computeGradient

FeedForwardTopology extends Topology、FeedForwardTopology伴生对象

FeedForwardModel extends TopologyModel、FeedForwardModel伴生对象

ANNGradient extends Gradient,Gradient来自mllib包

compute

ANNUpdater extends Updater,Updater来自mllib包

compute

ApplyInPlace

apply

DataStacker

stack、unstack

FeedForwardTrainer

train、...

LossFunction

loss

SigmoidLayerWithSquaredError extends Layer

weightSize、inPlace

getOutputSize、createModel、initModel

SigmoidLayerModelWithSquaredError extends FunctionalLayerModel with LossFunction

loss

SoftmaxLayerWithCrossEntropyLoss extends Layer

weightSize、inPlace

getOutputSize、createModel、initModel

SoftmaxLayerModelWithCrossEntropyLoss extends LayerModel

weights

eval、computePrevDelta、grad、loss

1.2 调用链条:

class LinearRegression

train->trainImpl->optimizer.iterations

object MultilayerPerceptronClassifierExample:加载数据文件后按比例分割

(Predictor) trainer.fit -> (MultilayerPerceptronClassifier) train -> (FeedForwardTrainer) train -> (mllib.GradientDescent) optimizeWithLossReturned -> (mllib.GradientDescent) runMiniBatchSGD

val trainer = new MultilayerPerceptronClassifier()
val model = trainer.fit(train)
val result = model.transform(test)
val evaluator = new MulticlassClassificationEvaluator()
val predictionAndLabels = result.select("prediction", "label")
println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")

MultilayerPerceptronClassifier extends ProbabilisticClassifier[Vector, MultilayerPerceptronClassifier,
MultilayerPerceptronClassificationModel] with MultilayerPerceptronParams with DefaultParamsWritable

setXxx()、setBlockSize(128)

(MultilayerPerceptronClassifier) train :

//数据处理
val encodeModel = new OneHotEncoderModel(uid, Array(labels))
val encodedDataset = encodeModel.transform(dataset) //将数据集的标签数值列转化为onehot格式
val data = encodedDataset.select($(featuresCol), encodedLabelCol).rdd.map {...} //数据集转化为rdd并将Row类型转化为二元元组类型

//模型处理
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)//根据输入输出隐藏层的单元数构建多层感知机
val trainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)//根据感知机模型构建训练器
//设置训练器参数
trainer.setStackSize($(blockSize)) //设置块大小
val (mlpModel, objectiveHistory) = trainer.train(data) //获得训练数据和训练集后开始训练
createModel(dataset, mlpModel.weights, objectiveHistory)//根据数据集和训练好的模型参数构建MultilayerPerceptronClassificationModel

(FeedForwardTrainer) train :

 val trainData = dataStacker.stack(data).map { v =>
      (v._1, OldVectors.fromML(v._2))
    } //堆叠数据,转化为mllib的vectors
if (handlePersistence) trainData.persist(StorageLevel.MEMORY_AND_DISK) //持久化
sgd.optimizeWithLossReturned(trainData, w) //优化
(topology.model(newWeights), lossHistory) //返回构建好的模型

MulticlassClassificationEvaluator

(ProbabilisticClassificationModel) transform(test)->(FeedForwardModel) predictRaw -> (FeedForwardModel) forward-> 各层模型的eval

二、mllib/src/.../spark/mllib/(旧版本)

2.1 stat 统计模块

Statistics.scala:定义了许多统计方法

correlation:相关系数

distribution:提供多元高斯(正态)分布的基本功能。

2.2 util

DataValidators:在进行机器学习算法之前对数据进行校验

xxGenerator:各算法的训练数据生成类,

MLUtils:数据格式转换工具类

modeSaveLoad:模型保存与加载

NumericParser:数字转换工具类

2.3 linalg

Vectors:定义了Vector、Vectors、DenseVector、SparseVector

Matrixs:定义了Matrix、DenseMatrix、SparseMatrix、Matrices

BLAS:实现类BLAS定义的线性代数矩阵/向量间的运算方法

distribute:分布式矩阵

RowMatrix:一行对应一个RDD,用Vector表示

IndexedRowMatrix:一行加上它的行索引对应一个RDD,用IndexedRow表示,IndexedRow的格式是(index:Long,vector:Vector)

coordinateMatrix:带坐标的元素MatrixEntry为一项,适合表示稀疏矩阵,MatrixEntry的格式是(Long,Long,Double)。

BlockMatrix:一个矩阵块为一个RDD,以RDD[((blockRowIndex,blockColIndex),sub-matrix)]的格式存储

2.4 regression

线性回归

LinearRegressionWithSGD

    • 继承了regression文件夹下的GeneralizedLinearAlgorithm的run方法,run方法中调用了optimizer.optimize
    • LinearRegressionWithSGD的optimizer是optimization文件夹下的GradientDescent类型的,所以调用optimize其实是调用GradientDescent伴生对象的runMiniBatchSGD
    • LinearRegressionWithSGD的gradient的类型是optimization文件夹下的Gradient.scala文件中定义的LeastSquaresGradient,所以调用了该类的compute方法,该方法使用最小二乘法计算梯度值和损失值
    • LinearRegressionWithSGD的updater的类型是optimization文件夹下的Updater.scala文件中定义的SimpleUpdater,所以调用了该类的compute方法
    • 训练完成后就生成了LinearRegressionModel类,该类可以进行模型的预测predict、保存加载等操作

三、

sparkMLlib的底层向量、数值、矩阵计算,用的是Breeze库,但参数传递用的是MLlib自己定义的Vector 。

底层采用BLAS线性代数运算库,分为向量向量、矩阵向量、矩阵矩阵运算。

sparkMLlib的线性回归采用随机梯度下降的方法

0条评论
作者已关闭评论
刘****璇
3文章数
0粉丝数
刘****璇
3 文章 | 0 粉丝
刘****璇
3文章数
0粉丝数
刘****璇
3 文章 | 0 粉丝
原创

sparkMLib源码学习笔记

2023-08-01 06:06:44
8
0

一、mllib/src/.../spark/ml/

ML基于DataFrame,而MLlib API基于RDD(https://zhuanlan.zhihu.com/p/161414849

ML提供了一套跨语言以及跨机器学习算法的API

1.1 ann文件夹结构

BreezeUtil: 两个矩阵计算方法

Layer:

维护权重大小、输出大小

createModel、initModel

LayerModel:

权重矩阵

eval、computePrevDelta、grad

AffineLayer extends Layer、AffineLayerModel伴生对象

AffineLayerModel extends LayerModel

FunctionalLayer extends LayerModel

FunctionalLayerModel extends LayerModel

ActivationFunction

eval、derivative

SigmoidFunction extends ActivationFunction

Topology

model

TopologyModel

weights、layers、layerModels

forward、predict、predictRaw、raw2ProbabilityInPlace、computeGradient

FeedForwardTopology extends Topology、FeedForwardTopology伴生对象

FeedForwardModel extends TopologyModel、FeedForwardModel伴生对象

ANNGradient extends Gradient,Gradient来自mllib包

compute

ANNUpdater extends Updater,Updater来自mllib包

compute

ApplyInPlace

apply

DataStacker

stack、unstack

FeedForwardTrainer

train、...

LossFunction

loss

SigmoidLayerWithSquaredError extends Layer

weightSize、inPlace

getOutputSize、createModel、initModel

SigmoidLayerModelWithSquaredError extends FunctionalLayerModel with LossFunction

loss

SoftmaxLayerWithCrossEntropyLoss extends Layer

weightSize、inPlace

getOutputSize、createModel、initModel

SoftmaxLayerModelWithCrossEntropyLoss extends LayerModel

weights

eval、computePrevDelta、grad、loss

1.2 调用链条:

class LinearRegression

train->trainImpl->optimizer.iterations

object MultilayerPerceptronClassifierExample:加载数据文件后按比例分割

(Predictor) trainer.fit -> (MultilayerPerceptronClassifier) train -> (FeedForwardTrainer) train -> (mllib.GradientDescent) optimizeWithLossReturned -> (mllib.GradientDescent) runMiniBatchSGD

val trainer = new MultilayerPerceptronClassifier()
val model = trainer.fit(train)
val result = model.transform(test)
val evaluator = new MulticlassClassificationEvaluator()
val predictionAndLabels = result.select("prediction", "label")
println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")

MultilayerPerceptronClassifier extends ProbabilisticClassifier[Vector, MultilayerPerceptronClassifier,
MultilayerPerceptronClassificationModel] with MultilayerPerceptronParams with DefaultParamsWritable

setXxx()、setBlockSize(128)

(MultilayerPerceptronClassifier) train :

//数据处理
val encodeModel = new OneHotEncoderModel(uid, Array(labels))
val encodedDataset = encodeModel.transform(dataset) //将数据集的标签数值列转化为onehot格式
val data = encodedDataset.select($(featuresCol), encodedLabelCol).rdd.map {...} //数据集转化为rdd并将Row类型转化为二元元组类型

//模型处理
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)//根据输入输出隐藏层的单元数构建多层感知机
val trainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)//根据感知机模型构建训练器
//设置训练器参数
trainer.setStackSize($(blockSize)) //设置块大小
val (mlpModel, objectiveHistory) = trainer.train(data) //获得训练数据和训练集后开始训练
createModel(dataset, mlpModel.weights, objectiveHistory)//根据数据集和训练好的模型参数构建MultilayerPerceptronClassificationModel

(FeedForwardTrainer) train :

 val trainData = dataStacker.stack(data).map { v =>
      (v._1, OldVectors.fromML(v._2))
    } //堆叠数据,转化为mllib的vectors
if (handlePersistence) trainData.persist(StorageLevel.MEMORY_AND_DISK) //持久化
sgd.optimizeWithLossReturned(trainData, w) //优化
(topology.model(newWeights), lossHistory) //返回构建好的模型

MulticlassClassificationEvaluator

(ProbabilisticClassificationModel) transform(test)->(FeedForwardModel) predictRaw -> (FeedForwardModel) forward-> 各层模型的eval

二、mllib/src/.../spark/mllib/(旧版本)

2.1 stat 统计模块

Statistics.scala:定义了许多统计方法

correlation:相关系数

distribution:提供多元高斯(正态)分布的基本功能。

2.2 util

DataValidators:在进行机器学习算法之前对数据进行校验

xxGenerator:各算法的训练数据生成类,

MLUtils:数据格式转换工具类

modeSaveLoad:模型保存与加载

NumericParser:数字转换工具类

2.3 linalg

Vectors:定义了Vector、Vectors、DenseVector、SparseVector

Matrixs:定义了Matrix、DenseMatrix、SparseMatrix、Matrices

BLAS:实现类BLAS定义的线性代数矩阵/向量间的运算方法

distribute:分布式矩阵

RowMatrix:一行对应一个RDD,用Vector表示

IndexedRowMatrix:一行加上它的行索引对应一个RDD,用IndexedRow表示,IndexedRow的格式是(index:Long,vector:Vector)

coordinateMatrix:带坐标的元素MatrixEntry为一项,适合表示稀疏矩阵,MatrixEntry的格式是(Long,Long,Double)。

BlockMatrix:一个矩阵块为一个RDD,以RDD[((blockRowIndex,blockColIndex),sub-matrix)]的格式存储

2.4 regression

线性回归

LinearRegressionWithSGD

    • 继承了regression文件夹下的GeneralizedLinearAlgorithm的run方法,run方法中调用了optimizer.optimize
    • LinearRegressionWithSGD的optimizer是optimization文件夹下的GradientDescent类型的,所以调用optimize其实是调用GradientDescent伴生对象的runMiniBatchSGD
    • LinearRegressionWithSGD的gradient的类型是optimization文件夹下的Gradient.scala文件中定义的LeastSquaresGradient,所以调用了该类的compute方法,该方法使用最小二乘法计算梯度值和损失值
    • LinearRegressionWithSGD的updater的类型是optimization文件夹下的Updater.scala文件中定义的SimpleUpdater,所以调用了该类的compute方法
    • 训练完成后就生成了LinearRegressionModel类,该类可以进行模型的预测predict、保存加载等操作

三、

sparkMLlib的底层向量、数值、矩阵计算,用的是Breeze库,但参数传递用的是MLlib自己定义的Vector 。

底层采用BLAS线性代数运算库,分为向量向量、矩阵向量、矩阵矩阵运算。

sparkMLlib的线性回归采用随机梯度下降的方法

文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0