1. val titles = movies.map(line => line.split("\\|").take(2)).map(array =>
(array(0).toInt, array(1))).collectAsMap()
2. val moviesForUser = ratings.keyBy(_.user).lookup(789)
3. val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double]
{ case (id, similarity) => similarity })
4. 1-of-k encoding of this categorical feature:
val categories = records.map(r => r(3)).distinct.collect.zipWithIndex.toMap
5. Construct feature vector with categorical features:
val dataCategories = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size -1).toInt val categoryIdx = categories(r(3)) val categoryFeatures = Array.ofDim[Double](numCategories) categoryFeatures(categoryIdx) = 1.0 val otherFeatures = trimmed.slice(4, r.size-1).map(d => if(d =="?") 0.0 else d.toDouble) val features = categoryFeatures ++ otherFeatures LabelPoint(label, Vectors.dense(features)) }
6. Standardize the feature vector
val scalerCats = new StandardScaler(withMean=true, withStd=true). fit(dataCategories.map(lp => lp.features)) val scaledDataCats = dataCategories.map(lp => LabeledPoint(lp.label, scalerCats.transform(lp.features)))
7. Represent the feature vectors as a distributed matrix to compute statistics
on the columns of the matrix.
import org.apache.spark.mllib.linalg.distributed.RowMatrix val vectors = data.map(lp => lp.features) val matrix = new RowMatrix(vectors) val matrixSummary = matrix.computeColumnSummaryStatistics()
8. Shell command to remove the header of CSV file.
$ head -1 hour.csv
$ sed 1d hour.csv > hour_noheader.csv
9. Calculate the sum of Euclidean distances between each point and the cluster centre,
summed over all clusters. Breeze for linear algebra and vector-based numerical functions.
import breeze.linalg._ import breeze.numerics.pow def computeDistance(v1: DenseVector[Double], v2: DenseVector[Double]) = pow(v1 - v2, 2).sum val titlesWithFactors = titlesAndGenres.join(movieFactors) val moviesAssigned = titlesWithFactors.map{ case (id, ((title, genres), vector)) => val pred = movieClusterModel.predict(vector) val clusterCenter = movieClusterModel.clusterCenters(pred) val dist = computeDistance(DenseVector(clusterCenter.toArray), DenseVector(vector.toArray)) (id, title, genres.mkString(" "), pred, dist) } val clusterAssignments = moviesAssigned.groupBy{ case (id, title, genres, cluster, dist) => cluster}.collectAsMap
10. Use wholeTextFiles to operate the entire files at once rather than individual lines.
val rdd = sc.wholeTextFiles(path) val files = rdd.map { case (filename, content) => filename.replace("file", "")}
11. Tokenize text features.
def tokenize(line: String): Seq[String] = { line.split("""\W+""") .map(_.toLowerCase) .filter(token => regex.pattern.matcher(token).matches) .filterNot(token => stopwords.contains(token)) .filterNot(token => rareTokens.contains(token)) .filter(token => token.size >=2) .toSeq }
12. Stateful streaming using the updateStateByKey
def updateState(prices: Seq[(String, Double)], currentTotal: Option[(Int, Double)]) = { val currentRevenue = prices.map(_._2).sum val currentNumberPurchases = prices.size val state = currentTotal.getOrElse((0,0.0)) Some((currentNumberPurchases+state._1, currentRevenue + state._2)) } val users = events.map {case (user, product, price) => (user, (product, price))} val revenuePerUser = users.updateStateByKey(updateState)
No comments:
Post a Comment