Thursday, 17 September 2015

Write Spark Dataframe to Cassandra table with different Schema

When we frequently insert new data frames into a single Cassandra table. We can't predict the schema of Cassandra table in advance. When there are now columns coming, we don't want manually alter C* table schema.

Below example check the schemas of current data frame and C* table, find and insert the new columns before inserting.
Save mode uses "Append" for updates. "Overwrite" for delete all columns then inserts.

Spark conf supports a list of Cassandra contact points. For example,

cassandra {
  host = ["127.0.0.3", "127.0.0.1", "127.0.0.2"]
  port = 9042
}

val hosts = cassandra.getStringList("host").asScala.mkString(",")
conf.set("spark.cassandra.connection.host", hosts)


def writeCassandra(sc:SparkContext, df: DataFrame, table: String, keyspace: String) =
{
 val dfSchema = df.columns.toList
 val cassTable = sc.cassandraTable(keyspace, table)
 val cassSchema = cassTable.selectedColumnNames.toSet
 val newCols = dfSchema.filterNot(cassSchema)

 if(!newCols.isEmpty) {
      val cassTable = sc.cassandraTable(keyspace, table)
      cassTable.connector.withSessionDo {
            session = {
           for (col - newCols)
               session.execute(s"ALTER TABLE $keyspace.$table ADD $col double")
    } } }

 df.write .format("org.apache.spark.sql.cassandra")
 .options(Map( "table" - table, "keyspace" - keyspace))
 .mode(SaveMode.Append).save()
 }




Reference:
https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

No comments:

Post a Comment