1. val titles = movies.map(line => line.split("\\|").take(2)).map(array =>
(array(0).toInt, array(1))).collectAsMap()
2. val moviesForUser = ratings.keyBy(_.user).lookup(789)
3. val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double]
{ case (id, similarity) => similarity })
4. 1-of-k encoding of this categorical feature:
val categories = records.map(r => r(3)).distinct.collect.zipWithIndex.toMap
5. Construct feature vector with categorical features:
val dataCategories = records.map { r =>
val trimmed = r.map(_.replaceAll("\"", ""))
val label = trimmed(r.size -1).toInt
val categoryIdx = categories(r(3))
val categoryFeatures = Array.ofDim[Double](numCategories)
categoryFeatures(categoryIdx) = 1.0
val otherFeatures = trimmed.slice(4, r.size-1).map(d => if(d =="?") 0.0 else d.toDouble)
val features = categoryFeatures ++ otherFeatures
LabelPoint(label, Vectors.dense(features))
}
6. Standardize the feature vector
val scalerCats = new StandardScaler(withMean=true, withStd=true).
fit(dataCategories.map(lp => lp.features))
val scaledDataCats = dataCategories.map(lp =>
LabeledPoint(lp.label, scalerCats.transform(lp.features)))
7. Represent the feature vectors as a distributed matrix to compute statistics
on the columns of the matrix.
import org.apache.spark.mllib.linalg.distributed.RowMatrix val vectors = data.map(lp => lp.features) val matrix = new RowMatrix(vectors) val matrixSummary = matrix.computeColumnSummaryStatistics()
8. Shell command to remove the header of CSV file.
$ head -1 hour.csv
$ sed 1d hour.csv > hour_noheader.csv
9. Calculate the sum of Euclidean distances between each point and the cluster centre,
summed over all clusters. Breeze for linear algebra and vector-based numerical functions.
import breeze.linalg._
import breeze.numerics.pow
def computeDistance(v1: DenseVector[Double], v2: DenseVector[Double])
= pow(v1 - v2, 2).sum
val titlesWithFactors = titlesAndGenres.join(movieFactors)
val moviesAssigned = titlesWithFactors.map{
case (id, ((title, genres), vector)) =>
val pred = movieClusterModel.predict(vector)
val clusterCenter = movieClusterModel.clusterCenters(pred)
val dist = computeDistance(DenseVector(clusterCenter.toArray),
DenseVector(vector.toArray))
(id, title, genres.mkString(" "), pred, dist)
}
val clusterAssignments = moviesAssigned.groupBy{
case (id, title, genres, cluster, dist) => cluster}.collectAsMap
10. Use wholeTextFiles to operate the entire files at once rather than individual lines.
val rdd = sc.wholeTextFiles(path)
val files = rdd.map { case (filename, content) =>
filename.replace("file", "")}
11. Tokenize text features.
def tokenize(line: String): Seq[String] = {
line.split("""\W+""")
.map(_.toLowerCase)
.filter(token => regex.pattern.matcher(token).matches)
.filterNot(token => stopwords.contains(token))
.filterNot(token => rareTokens.contains(token))
.filter(token => token.size >=2)
.toSeq
}
12. Stateful streaming using the updateStateByKey
def updateState(prices: Seq[(String, Double)], currentTotal:
Option[(Int, Double)]) = {
val currentRevenue = prices.map(_._2).sum
val currentNumberPurchases = prices.size
val state = currentTotal.getOrElse((0,0.0))
Some((currentNumberPurchases+state._1,
currentRevenue + state._2))
}
val users = events.map {case (user, product, price) =>
(user, (product, price))}
val revenuePerUser = users.updateStateByKey(updateState)
No comments:
Post a Comment