Sunday 29 May 2016

Spark Parses XML into Cassandra

Below is an example to parse XML data into Cassandra. Then, do aggregations in Cassandra.


In the SBT file:

object Cassandra {
      val cassandraSparkConnector    = "com.datastax.spark"     %% "spark-cassandra-connector"   % CassandraConnectorVersion
      val cassandraDriver       = "com.datastax.cassandra"  % "cassandra-driver-core"       % CassandraDriverVersion     guavaExclude
      val cassandraClient       = "org.apache.cassandra"    % "cassandra-clientutil"        % CassandraVersion           guavaExclude
      val cassandraAll          = "org.apache.cassandra"    % "cassandra-all"               % CassandraVersion           logbackExclude
    }


package com.loyaltyone.exercises

import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.rdd.CassandraRDD
import org.apache.spark.SparkContext
import java.io.File
import com.datastax.spark.connector._
import scala.xml._

object PrecipitationStats {

  val KEYSPACE = "climate"
  val TABLE = "precipitation"

  def executor(args: Array[String], sc: SparkContext, connector: CassandraConnector) {

     //create keyspace and tables
     createTables(connector)

     //load and parse xml files
     val xmlfiles = new File(args(1)).listFiles.filter(_.getName.endsWith(".xml"))
     val mxml = sc.parallelize(xmlfiles).map(XML.loadFile)

     val precipitations = mxml.map(parseXML).map(l => l(0))

     //collect all country codes
     val fileNames = new File(args(1)).listFiles.map(_.getName.split('.')(0))
     val countries = sc.parallelize(fileNames)

     //link country with its precipitations in a tuple
     val countryAndPrecipitations = countries.zip(precipitations)
    .map(x => (x._1, x._2._1, x._2._2, x._2._3, x._2._4, x._2._5, x._2._6, x._2._7,
                      x._2._8, x._2._9, x._2._10, x._2._11, x._2._12 ))


     //save data in Cassandra table
     countryAndPrecipitations.saveToCassandra(KEYSPACE, TABLE,
        SomeColumns("country","jan","feb","mar","apr","may","jun","jul","aug","sep","oct","nov","dec"))


     val precipitationTable = sc.cassandraTable(KEYSPACE, TABLE)

     precipitationTable.cache()


     val months = Seq("jan","feb","mar","apr","may","jun","jul","aug","sep","oct","nov","dec")

     //start precipitation stats

     val globalStats = getGlobalStats(months, precipitationTable)


     //output stats results
     println("Global total precipitation | Global Avg precipitation | Min precipitation country | Max precipitation country")
     println(globalStats)
   }


  def createTables(connector: CassandraConnector): Unit ={

    connector.withSessionDo { session =>
      session.execute(s"DROP KEYSPACE $KEYSPACE")
      session.execute(s"CREATE KEYSPACE $KEYSPACE WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
      session.execute(s"""
         CREATE TABLE IF NOT EXISTS $KEYSPACE.$TABLE  (
         country text,
         jan double,
         feb double,
         mar double,
         apr double,
         may double,
         jun double,
         jul double,
         aug double,
         sep double,
         oct double,
         nov double,
         dec double,
         PRIMARY KEY (country)
       )
        """)
    }
  }



  def parseXML(xmlDoc: Elem)={

    val datum = (xmlDoc \ "domain.web.MonthlyGcmDatum")

    val bccr = datum.filter(x => (x \ "gcm").text == "bccr_bcm2_0")

    val months = bccr.map(gcm => (gcm \ "monthVals"))
      .map( month => (month \ "double").map(pr => pr.text.toDouble))
      .map(toTuple(_))

    months

  }


  def toTuple(x: Seq[Double]) ={

    x match {
      case Seq(jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec) => (jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec);
      case _ => (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
    }

  }


  def getGlobalStats(months: Seq[String], table: CassandraRDD[CassandraRow]) ={

    val countryStats = table.map{ r =>
      val yearInfo = months.map(m => r.getDouble(m)).sum
      (r.getString("country"), yearInfo)
    }

    countryStats.cache()

    val dryCountry = countryStats.sortBy(r => r._2, true)
      .collect.head

    val humidCountry = countryStats.sortBy(r => r._2, false)
      .collect.head

    val total = countryStats.map(p => p._2).sum()
    val avg = total/countryStats.count()

    (total, avg, dryCountry, humidCountry)

  }
 }

Please note, the row inserted into Cassandra must be Tuple type, rather than Seq[]. Otherwise, it will show the below error.
When you get <none> is not a term it may be because the information about the class was not properly serialized. 
 
Exception in thread "main" scala.ScalaReflectionException: is not a term at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84) at com.datastax.spark.connector.util.Reflect$.methodSymbol(Reflect.scala:12) at com.datastax.spark.connector.util.ReflectionUtil$.constructorParams(ReflectionUtil.scala:61) at com.datastax.spark.connector.mapper.DefaultColumnMapper.(DefaultColumnMapper.scala:43) at com.datastax.spark.connector.mapper.LowPriorityColumnMapper$class.defaultColumnMapper(ColumnMapper.scala:47) at com.datastax.spark.connector.mapper.ColumnMapper$.defaultColumnMapper(ColumnMapper.scala:51)


Reference:
http://stackoverflow.com/questions/14722860/convert-a-scala-list-to-a-tuple
https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/8r-HgIqtWrk

No comments:

Post a Comment