The program reads parquet files to LabelPoints. Train and build the model. Then, evaluation the model performance.
def convertParquet2LP(sqlContext: SQLContext, inputPath: Seq[String]) ={ val data = inputPath. map{ path => sqlContext.read.parquet(hdfsFullPath). reduceLeft{ _.unionAll(_) } val labeledPoints = data.map{ row => val rowSeq = row.toSeq val len = rowSeq.size val features= rowSeq.slice(1, len-2).asInstanceOf[Seq[Double]].toArray[Double] LabeledPoint(rowSeq(len-1).asInstanceOf[Double], Vectors.dense(features)) } labeledPoints } /**Setup and train LR model**/ val trainLP = convertParquet2LP(sqlContext, lrConfig.inputTrainPath) val testLP = convertParquet2LP(sqlContext, lrConfig.inputTestPath) val lr = new LogisticRegression() /**Set ParamMap**/ val paramMap = ParamMap(lr.maxIter -> 20) paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.55) val lrModel = lr.fit(trainLP.toDF(), paramMap) println("Model was fit using parameters: " + lrModel.parent.extractParamMap()) println("Model intercept: "+lrModel.intercept+" and weights: "+lrModel.weights) /**Apply LR model on test dataset**/ val predictionAndLabels = lrModel.transform(testLP.toDF()) .select("features", "label", "probability", "prediction") .map{ case Row(features: Vector, label: Double, pro: Vector,prediction: Double) => (prediction, label) } /**Performance Analytic**/ val metrics = new MulticlassMetrics(predictionAndLabels) val tp = metrics.confusionMatrix(0,0) val fp = metrics.confusionMatrix(1,0) val fn = metrics.confusionMatrix(0,1) val tn = metrics.confusionMatrix(1,1) println("Precision = " + metrics.precision + " Recall = "+metrics.recall) println("TP: %d, FP: %d, FN: %d, TN: %d", tp, fp, fn, tn) /**Cross Validation**/ val crossVal = new CrossValidator().setEstimator(lr).
setEvaluator(new BinaryClassificationEvaluator()) val paramGridBuilder = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.01)) .addGrid(lr.maxIter, Array(10, 20, 30)) .addGrid(lr.threshold, Array(0.55, 0.65)) .build() crossVal.setEstimatorParamMaps(paramGridBuilder) crossVal.setNumFolds(2) val cvModel = crossVal.fit(trainLP.toDF()) println("Model was fit using parameters: " + cvModel.bestModel.
extractParamMap()) //Save and Load model from disk sc.parallelize(Seq(cvModel), 1).saveAsObjectFile(lrConfig.pmmlPath) val savedModel = sc.objectFile[CrossValidatorModel](lrConfig.pmmlPath).first() val predictionAndLabels2 = savedModel.transform(testLP.toDF()) .select("features", "label", "probability", "prediction") .map{ case Row(features: Vector, label: Double, pro: Vector,
prediction: Double) => (prediction, label) }
No comments:
Post a Comment