Friday, 11 September 2015

LogisticRegression Implementation in Spark Pipeline

Implemented LogisticRegression with Spark's Pipeline API in MLlib.
The program reads parquet files to LabelPoints. Train and build the model. Then, evaluation the model performance.

def convertParquet2LP(sqlContext: SQLContext, inputPath: Seq[String]) ={

    val data = inputPath.
      map{ path => sqlContext.read.parquet(hdfsFullPath).
      reduceLeft{ _.unionAll(_) }

    val labeledPoints = data.map{ row =>
      val rowSeq = row.toSeq
      val len = rowSeq.size
      val features= rowSeq.slice(1, len-2).asInstanceOf[Seq[Double]].toArray[Double]

      LabeledPoint(rowSeq(len-1).asInstanceOf[Double], Vectors.dense(features))
    }

    labeledPoints
  }


    /**Setup and train LR model**/
    val trainLP = convertParquet2LP(sqlContext, lrConfig.inputTrainPath)
    val testLP = convertParquet2LP(sqlContext, lrConfig.inputTestPath)
    val lr = new LogisticRegression()

    /**Set ParamMap**/
    val paramMap = ParamMap(lr.maxIter -> 20)
    paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.55)

    val lrModel = lr.fit(trainLP.toDF(), paramMap)

    println("Model was fit using parameters: " + lrModel.parent.extractParamMap())
    println("Model intercept: "+lrModel.intercept+" and weights: "+lrModel.weights)

    /**Apply LR model on test dataset**/
    val predictionAndLabels = lrModel.transform(testLP.toDF())
      .select("features", "label", "probability", "prediction")
      .map{ case Row(features: Vector, label: Double, pro: Vector,prediction: Double) =>
      (prediction, label)
    }


    /**Performance Analytic**/
    val metrics = new MulticlassMetrics(predictionAndLabels)
    val tp = metrics.confusionMatrix(0,0)
    val fp = metrics.confusionMatrix(1,0)
    val fn = metrics.confusionMatrix(0,1)
    val tn = metrics.confusionMatrix(1,1)

    println("Precision = " + metrics.precision + " Recall = "+metrics.recall)
    println("TP: %d, FP: %d, FN: %d, TN: %d", tp, fp, fn, tn)

  /**Cross Validation**/

    val crossVal = new CrossValidator().setEstimator(lr).
               setEvaluator(new BinaryClassificationEvaluator())

    val paramGridBuilder = new ParamGridBuilder()
      .addGrid(lr.regParam, Array(0.1, 0.01))
      .addGrid(lr.maxIter, Array(10, 20, 30))
      .addGrid(lr.threshold, Array(0.55, 0.65))
      .build()

    crossVal.setEstimatorParamMaps(paramGridBuilder)
    crossVal.setNumFolds(2)

    val cvModel = crossVal.fit(trainLP.toDF())

    println("Model was fit using parameters: " + cvModel.bestModel.
           extractParamMap())

    //Save and Load model from disk
    sc.parallelize(Seq(cvModel), 1).saveAsObjectFile(lrConfig.pmmlPath)

    val savedModel = sc.objectFile[CrossValidatorModel](lrConfig.pmmlPath).first()

    val predictionAndLabels2 = savedModel.transform(testLP.toDF())
      .select("features", "label", "probability", "prediction")
      .map{ case Row(features: Vector, label: Double, pro: Vector, 
       prediction: Double) =>
      (prediction, label)
    }


No comments:

Post a Comment