In the SBT file:
object Cassandra {
val cassandraSparkConnector = "com.datastax.spark" %% "spark-cassandra-connector" % CassandraConnectorVersion
val cassandraDriver = "com.datastax.cassandra" % "cassandra-driver-core" % CassandraDriverVersion guavaExclude
val cassandraClient = "org.apache.cassandra" % "cassandra-clientutil" % CassandraVersion guavaExclude
val cassandraAll = "org.apache.cassandra" % "cassandra-all" % CassandraVersion logbackExclude
}
package com.loyaltyone.exercises
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.rdd.CassandraRDD
import org.apache.spark.SparkContext
import java.io.File
import com.datastax.spark.connector._
import scala.xml._
object PrecipitationStats {
val KEYSPACE = "climate"
val TABLE = "precipitation"
def executor(args: Array[String], sc: SparkContext, connector: CassandraConnector) {
//create keyspace and tables
createTables(connector)
//load and parse xml files
val xmlfiles = new File(args(1)).listFiles.filter(_.getName.endsWith(".xml"))
val mxml = sc.parallelize(xmlfiles).map(XML.loadFile)
val precipitations = mxml.map(parseXML).map(l => l(0))
//collect all country codes
val fileNames = new File(args(1)).listFiles.map(_.getName.split('.')(0))
val countries = sc.parallelize(fileNames)
//link country with its precipitations in a tuple
val countryAndPrecipitations = countries.zip(precipitations)
.map(x => (x._1, x._2._1, x._2._2, x._2._3, x._2._4, x._2._5, x._2._6, x._2._7,
x._2._8, x._2._9, x._2._10, x._2._11, x._2._12 ))
//save data in Cassandra table
countryAndPrecipitations.saveToCassandra(KEYSPACE, TABLE,
SomeColumns("country","jan","feb","mar","apr","may","jun","jul","aug","sep","oct","nov","dec"))
val precipitationTable = sc.cassandraTable(KEYSPACE, TABLE)
precipitationTable.cache()
val months = Seq("jan","feb","mar","apr","may","jun","jul","aug","sep","oct","nov","dec")
//start precipitation stats
val globalStats = getGlobalStats(months, precipitationTable)
//output stats results
println("Global total precipitation | Global Avg precipitation | Min precipitation country | Max precipitation country")
println(globalStats)
}
def createTables(connector: CassandraConnector): Unit ={
connector.withSessionDo { session =>
session.execute(s"DROP KEYSPACE $KEYSPACE")
session.execute(s"CREATE KEYSPACE $KEYSPACE WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"""
CREATE TABLE IF NOT EXISTS $KEYSPACE.$TABLE (
country text,
jan double,
feb double,
mar double,
apr double,
may double,
jun double,
jul double,
aug double,
sep double,
oct double,
nov double,
dec double,
PRIMARY KEY (country)
)
""")
}
}
def parseXML(xmlDoc: Elem)={
val datum = (xmlDoc \ "domain.web.MonthlyGcmDatum")
val bccr = datum.filter(x => (x \ "gcm").text == "bccr_bcm2_0")
val months = bccr.map(gcm => (gcm \ "monthVals"))
.map( month => (month \ "double").map(pr => pr.text.toDouble))
.map(toTuple(_))
months
}
def toTuple(x: Seq[Double]) ={
x match {
case Seq(jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec) => (jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec);
case _ => (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
}
}
def getGlobalStats(months: Seq[String], table: CassandraRDD[CassandraRow]) ={
val countryStats = table.map{ r =>
val yearInfo = months.map(m => r.getDouble(m)).sum
(r.getString("country"), yearInfo)
}
countryStats.cache()
val dryCountry = countryStats.sortBy(r => r._2, true)
.collect.head
val humidCountry = countryStats.sortBy(r => r._2, false)
.collect.head
val total = countryStats.map(p => p._2).sum()
val avg = total/countryStats.count()
(total, avg, dryCountry, humidCountry)
}
}
Please note, the row inserted into Cassandra must be Tuple type, rather than Seq[]. Otherwise, it will show the below error.
When you get <none> is not a term it may be because the information about the class was not properly serialized.
Exception in thread "main" scala.ScalaReflectionException:
No comments:
Post a Comment