Below example check the schemas of current data frame and C* table, find and insert the new columns before inserting.
Save mode uses "Append" for updates. "Overwrite" for delete all columns then inserts.
Spark conf supports a list of Cassandra contact points. For example,
cassandra {
host = ["127.0.0.3", "127.0.0.1", "127.0.0.2"]
port = 9042
}
val hosts = cassandra.getStringList("host").asScala.mkString(",")
conf.set("spark.cassandra.connection.host", hosts)
def writeCassandra(sc:SparkContext, df: DataFrame, table: String, keyspace: String) =
{
val dfSchema = df.columns.toList
val cassTable = sc.cassandraTable(keyspace, table)
val cassSchema = cassTable.selectedColumnNames.toSet
val newCols = dfSchema.filterNot(cassSchema)
if(!newCols.isEmpty) {
val cassTable = sc.cassandraTable(keyspace, table)
cassTable.connector.withSessionDo {
session = {
for (col - newCols)
session.execute(s"ALTER TABLE $keyspace.$table ADD $col double")
} } }
df.write .format("org.apache.spark.sql.cassandra")
.options(Map( "table" - table, "keyspace" - keyspace))
.mode(SaveMode.Append).save()
}
Reference:
https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
No comments:
Post a Comment