一、mllib/src/.../spark/ml/
ML基于DataFrame,而MLlib API基于RDD(https://zhuanlan.zhihu.com/p/161414849)
ML提供了一套跨语言以及跨机器学习算法的API
1.1 ann文件夹结构
BreezeUtil: 两个矩阵计算方法
Layer:
维护权重大小、输出大小
createModel、initModel
LayerModel:
权重矩阵
eval、computePrevDelta、grad
AffineLayer extends Layer、AffineLayerModel伴生对象
AffineLayerModel extends LayerModel
FunctionalLayer extends LayerModel
FunctionalLayerModel extends LayerModel
ActivationFunction
eval、derivative
SigmoidFunction extends ActivationFunction
Topology
model
TopologyModel
weights、layers、layerModels
forward、predict、predictRaw、raw2ProbabilityInPlace、computeGradient
FeedForwardTopology extends Topology、FeedForwardTopology伴生对象
FeedForwardModel extends TopologyModel、FeedForwardModel伴生对象
ANNGradient extends Gradient,Gradient来自mllib包
compute
ANNUpdater extends Updater,Updater来自mllib包
compute
ApplyInPlace
apply
DataStacker
stack、unstack
FeedForwardTrainer
train、...
LossFunction
loss
SigmoidLayerWithSquaredError extends Layer
weightSize、inPlace
getOutputSize、createModel、initModel
SigmoidLayerModelWithSquaredError extends FunctionalLayerModel with LossFunction
loss
SoftmaxLayerWithCrossEntropyLoss extends Layer
weightSize、inPlace
getOutputSize、createModel、initModel
SoftmaxLayerModelWithCrossEntropyLoss extends LayerModel
weights
eval、computePrevDelta、grad、loss
1.2 调用链条:
class LinearRegression
train->trainImpl->optimizer.iterations
object MultilayerPerceptronClassifierExample:加载数据文件后按比例分割
(Predictor) trainer.fit -> (MultilayerPerceptronClassifier) train -> (FeedForwardTrainer) train -> (mllib.GradientDescent) optimizeWithLossReturned -> (mllib.GradientDescent) runMiniBatchSGD
val trainer = new MultilayerPerceptronClassifier()
val model = trainer.fit(train)
val result = model.transform(test)
val evaluator = new MulticlassClassificationEvaluator()
val predictionAndLabels = result.select("prediction", "label")
println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")
MultilayerPerceptronClassifier extends ProbabilisticClassifier[Vector, MultilayerPerceptronClassifier,
MultilayerPerceptronClassificationModel] with MultilayerPerceptronParams with DefaultParamsWritable
setXxx()、setBlockSize(128)
(MultilayerPerceptronClassifier) train :
//数据处理
val encodeModel = new OneHotEncoderModel(uid, Array(labels))
val encodedDataset = encodeModel.transform(dataset) //将数据集的标签数值列转化为onehot格式
val data = encodedDataset.select($(featuresCol), encodedLabelCol).rdd.map {...} //数据集转化为rdd并将Row类型转化为二元元组类型
//模型处理
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)//根据输入输出隐藏层的单元数构建多层感知机
val trainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)//根据感知机模型构建训练器
//设置训练器参数
trainer.setStackSize($(blockSize)) //设置块大小
val (mlpModel, objectiveHistory) = trainer.train(data) //获得训练数据和训练集后开始训练
createModel(dataset, mlpModel.weights, objectiveHistory)//根据数据集和训练好的模型参数构建MultilayerPerceptronClassificationModel
(FeedForwardTrainer) train :
val trainData = dataStacker.stack(data).map { v =>
(v._1, OldVectors.fromML(v._2))
} //堆叠数据,转化为mllib的vectors
if (handlePersistence) trainData.persist(StorageLevel.MEMORY_AND_DISK) //持久化
sgd.optimizeWithLossReturned(trainData, w) //优化
(topology.model(newWeights), lossHistory) //返回构建好的模型
MulticlassClassificationEvaluator
(ProbabilisticClassificationModel) transform(test)->(FeedForwardModel) predictRaw -> (FeedForwardModel) forward-> 各层模型的eval
二、mllib/src/.../spark/mllib/(旧版本)
2.1 stat 统计模块
Statistics.scala:定义了许多统计方法
correlation:相关系数
distribution:提供多元高斯(正态)分布的基本功能。
2.2 util
DataValidators:在进行机器学习算法之前对数据进行校验
xxGenerator:各算法的训练数据生成类,
MLUtils:数据格式转换工具类
modeSaveLoad:模型保存与加载
NumericParser:数字转换工具类
2.3 linalg
Vectors:定义了Vector、Vectors、DenseVector、SparseVector
Matrixs:定义了Matrix、DenseMatrix、SparseMatrix、Matrices
BLAS:实现类BLAS定义的线性代数矩阵/向量间的运算方法
distribute:分布式矩阵
RowMatrix:一行对应一个RDD,用Vector表示
IndexedRowMatrix:一行加上它的行索引对应一个RDD,用IndexedRow表示,IndexedRow的格式是(index:Long,vector:Vector)
coordinateMatrix:带坐标的元素MatrixEntry为一项,适合表示稀疏矩阵,MatrixEntry的格式是(Long,Long,Double)。
BlockMatrix:一个矩阵块为一个RDD,以RDD[((blockRowIndex,blockColIndex),sub-matrix)]的格式存储
2.4 regression
线性回归
LinearRegressionWithSGD
- 继承了regression文件夹下的GeneralizedLinearAlgorithm的run方法,run方法中调用了optimizer.optimize
- LinearRegressionWithSGD的optimizer是optimization文件夹下的GradientDescent类型的,所以调用optimize其实是调用GradientDescent伴生对象的runMiniBatchSGD
- LinearRegressionWithSGD的gradient的类型是optimization文件夹下的Gradient.scala文件中定义的LeastSquaresGradient,所以调用了该类的compute方法,该方法使用最小二乘法计算梯度值和损失值
- LinearRegressionWithSGD的updater的类型是optimization文件夹下的Updater.scala文件中定义的SimpleUpdater,所以调用了该类的compute方法
- 训练完成后就生成了LinearRegressionModel类,该类可以进行模型的预测predict、保存加载等操作
三、
sparkMLlib的底层向量、数值、矩阵计算,用的是Breeze库,但参数传递用的是MLlib自己定义的Vector 。
底层采用BLAS线性代数运算库,分为向量向量、矩阵向量、矩阵矩阵运算。
sparkMLlib的线性回归采用随机梯度下降的方法