Saturday, 29 August 2015

Useful Spark Statements in ML

Some useful Spark statements and functions in Machine learning.

1. val titles = movies.map(line => line.split("\\|").take(2)).map(array =>
                     (array(0).toInt, array(1))).collectAsMap()

2. val moviesForUser = ratings.keyBy(_.user).lookup(789)

3. val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double]
                               { case (id, similarity) => similarity })

4.  1-of-k encoding of this categorical feature:
val categories = records.map(r => r(3)).distinct.collect.zipWithIndex.toMap

5. Construct feature vector with categorical features:

val dataCategories = records.map { r =>
    val trimmed = r.map(_.replaceAll("\"", ""))
    val label = trimmed(r.size -1).toInt
    val categoryIdx = categories(r(3))
    val categoryFeatures = Array.ofDim[Double](numCategories)
    categoryFeatures(categoryIdx) = 1.0
    val otherFeatures = trimmed.slice(4, r.size-1).map(d => if(d =="?") 0.0 else d.toDouble)
    val features = categoryFeatures ++ otherFeatures
    LabelPoint(label, Vectors.dense(features))
}


6. Standardize the feature vector

val scalerCats = new StandardScaler(withMean=true, withStd=true).
                          fit(dataCategories.map(lp => lp.features))
val scaledDataCats = dataCategories.map(lp =>
               LabeledPoint(lp.label, scalerCats.transform(lp.features)))

7. Represent the feature vectors as a distributed matrix to compute statistics
on the columns of the matrix.

import org.apache.spark.mllib.linalg.distributed.RowMatrix
val vectors = data.map(lp => lp.features)
val matrix = new RowMatrix(vectors)
val matrixSummary = matrix.computeColumnSummaryStatistics()

8. Shell command to remove the header of CSV file.
$ head -1 hour.csv
$ sed 1d hour.csv > hour_noheader.csv

9. Calculate the sum of Euclidean distances between each point and the cluster centre,
summed over all clusters. Breeze for linear algebra and vector-based numerical functions.

import breeze.linalg._
import breeze.numerics.pow

def computeDistance(v1: DenseVector[Double], v2: DenseVector[Double])
= pow(v1 - v2, 2).sum

val titlesWithFactors = titlesAndGenres.join(movieFactors)
val moviesAssigned = titlesWithFactors.map{
 case (id, ((title, genres), vector)) =>
 val pred = movieClusterModel.predict(vector)
 val clusterCenter = movieClusterModel.clusterCenters(pred)
 val dist = computeDistance(DenseVector(clusterCenter.toArray),
   DenseVector(vector.toArray))
 (id, title, genres.mkString(" "), pred, dist)
}

val clusterAssignments = moviesAssigned.groupBy{
 case (id, title, genres, cluster, dist) => cluster}.collectAsMap



10. Use wholeTextFiles to operate the entire files at once rather than individual lines.

val rdd = sc.wholeTextFiles(path)
val files = rdd.map { case (filename, content) =>
filename.replace("file", "")}

11. Tokenize text features.

def tokenize(line: String): Seq[String] = {
 line.split("""\W+""")
 .map(_.toLowerCase)
 .filter(token => regex.pattern.matcher(token).matches)
 .filterNot(token => stopwords.contains(token))
 .filterNot(token => rareTokens.contains(token))
 .filter(token => token.size >=2)
 .toSeq
}


12. Stateful streaming using the updateStateByKey

def updateState(prices: Seq[(String, Double)], currentTotal: 
 Option[(Int, Double)]) = {
  val currentRevenue = prices.map(_._2).sum
  val currentNumberPurchases = prices.size
  val state = currentTotal.getOrElse((0,0.0))
  Some((currentNumberPurchases+state._1, 
   currentRevenue + state._2))
}

val users = events.map {case (user, product, price) =>
  (user, (product, price))}
val revenuePerUser = users.updateStateByKey(updateState)  

No comments:

Post a Comment