The program reads parquet files to LabelPoints. Train and build the model. Then, evaluation the model performance.
def convertParquet2LP(sqlContext: SQLContext, inputPath: Seq[String]) ={
val data = inputPath.
map{ path => sqlContext.read.parquet(hdfsFullPath).
reduceLeft{ _.unionAll(_) }
val labeledPoints = data.map{ row =>
val rowSeq = row.toSeq
val len = rowSeq.size
val features= rowSeq.slice(1, len-2).asInstanceOf[Seq[Double]].toArray[Double]
LabeledPoint(rowSeq(len-1).asInstanceOf[Double], Vectors.dense(features))
}
labeledPoints
}
/**Setup and train LR model**/
val trainLP = convertParquet2LP(sqlContext, lrConfig.inputTrainPath)
val testLP = convertParquet2LP(sqlContext, lrConfig.inputTestPath)
val lr = new LogisticRegression()
/**Set ParamMap**/
val paramMap = ParamMap(lr.maxIter -> 20)
paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.55)
val lrModel = lr.fit(trainLP.toDF(), paramMap)
println("Model was fit using parameters: " + lrModel.parent.extractParamMap())
println("Model intercept: "+lrModel.intercept+" and weights: "+lrModel.weights)
/**Apply LR model on test dataset**/
val predictionAndLabels = lrModel.transform(testLP.toDF())
.select("features", "label", "probability", "prediction")
.map{ case Row(features: Vector, label: Double, pro: Vector,prediction: Double) =>
(prediction, label)
}
/**Performance Analytic**/
val metrics = new MulticlassMetrics(predictionAndLabels)
val tp = metrics.confusionMatrix(0,0)
val fp = metrics.confusionMatrix(1,0)
val fn = metrics.confusionMatrix(0,1)
val tn = metrics.confusionMatrix(1,1)
println("Precision = " + metrics.precision + " Recall = "+metrics.recall)
println("TP: %d, FP: %d, FN: %d, TN: %d", tp, fp, fn, tn)
/**Cross Validation**/
val crossVal = new CrossValidator().setEstimator(lr).
setEvaluator(new BinaryClassificationEvaluator())
val paramGridBuilder = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.maxIter, Array(10, 20, 30))
.addGrid(lr.threshold, Array(0.55, 0.65))
.build()
crossVal.setEstimatorParamMaps(paramGridBuilder)
crossVal.setNumFolds(2)
val cvModel = crossVal.fit(trainLP.toDF())
println("Model was fit using parameters: " + cvModel.bestModel.
extractParamMap())
//Save and Load model from disk
sc.parallelize(Seq(cvModel), 1).saveAsObjectFile(lrConfig.pmmlPath)
val savedModel = sc.objectFile[CrossValidatorModel](lrConfig.pmmlPath).first()
val predictionAndLabels2 = savedModel.transform(testLP.toDF())
.select("features", "label", "probability", "prediction")
.map{ case Row(features: Vector, label: Double, pro: Vector,
prediction: Double) =>
(prediction, label)
}
No comments:
Post a Comment