Below example check the schemas of current data frame and C* table, find and insert the new columns before inserting.
Save mode uses "Append" for updates. "Overwrite" for delete all columns then inserts.
Spark conf supports a list of Cassandra contact points. For example,
cassandra {
host = ["127.0.0.3", "127.0.0.1", "127.0.0.2"]
port = 9042
}
val hosts = cassandra.getStringList("host").asScala.mkString(",")
conf.set("spark.cassandra.connection.host", hosts)
def writeCassandra(sc:SparkContext, df: DataFrame, table: String, keyspace: String) = { val dfSchema = df.columns.toList val cassTable = sc.cassandraTable(keyspace, table) val cassSchema = cassTable.selectedColumnNames.toSet val newCols = dfSchema.filterNot(cassSchema) if(!newCols.isEmpty) { val cassTable = sc.cassandraTable(keyspace, table) cassTable.connector.withSessionDo { session = { for (col - newCols) session.execute(s"ALTER TABLE $keyspace.$table ADD $col double") } } } df.write .format("org.apache.spark.sql.cassandra") .options(Map( "table" - table, "keyspace" - keyspace)) .mode(SaveMode.Append).save() }
Reference:
https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
No comments:
Post a Comment