Thursday, 17 September 2015

Write Spark Dataframe to Cassandra table with different Schema

When we frequently insert new data frames into a single Cassandra table. We can't predict the schema of Cassandra table in advance. When there are now columns coming, we don't want manually alter C* table schema.

Below example check the schemas of current data frame and C* table, find and insert the new columns before inserting.
Save mode uses "Append" for updates. "Overwrite" for delete all columns then inserts.

Spark conf supports a list of Cassandra contact points. For example,

cassandra {
  host = ["", "", ""]
  port = 9042

val hosts = cassandra.getStringList("host").asScala.mkString(",")
conf.set("", hosts)

def writeCassandra(sc:SparkContext, df: DataFrame, table: String, keyspace: String) =
 val dfSchema = df.columns.toList
 val cassTable = sc.cassandraTable(keyspace, table)
 val cassSchema = cassTable.selectedColumnNames.toSet
 val newCols = dfSchema.filterNot(cassSchema)

 if(!newCols.isEmpty) {
      val cassTable = sc.cassandraTable(keyspace, table)
      cassTable.connector.withSessionDo {
            session = {
           for (col - newCols)
               session.execute(s"ALTER TABLE $keyspace.$table ADD $col double")
    } } }

 df.write .format("org.apache.spark.sql.cassandra")
 .options(Map( "table" - table, "keyspace" - keyspace))


No comments:

Post a Comment