Wednesday, 18 November 2015

Append Spark Dataframe with a new Column by UDF

To change the schema of a data frame, we can operate on its RDD, then apply a new schema.
Or generate another data frame, then join with the original data frame.

1. Appending a new column from a UDF
The most connivence approach is to use withColumn(String, Column) method, which returns a new data frame by adding a new column.
It can only operate on the same data frame columns, rather than the column of another data frame.
For eample,

val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL

For a more complex case, we can apply a udf on the column.

import org.apache.spark.sql.functions._

val func = udf((col1 : String, col2 : String) =>
                { val combine = col1 +" "+ col2

df.withColumn("newField", func(inputDF("colfield1"), inputDF("colfield2")))

//alternative$"*", func($"colfield1",$"colfield2").as("newField"))

Register a UDF in Spark 1.5.2
sqlContext.udf.register("strLen", (s: String) => s.length())

2. Appending a new Column with constant value

When you need to append a constant value that is not related to existing columns of the dataframe.

import org.apache.spark.sql.functions._
val newDf = xmlDf.withColumn("newColumn", lit("newValue"))

3. Zip Two Columns with Array type

val pairTwo = udf((col1: Seq[Long], col2 : Seq[String]) =>
{ col1 zip col2})

Note: We have to use Seq[T] instead of Array[T], since
WrappedArray is not an Array (which is plain old Java Array not a natve Scala collection)



  1. how to add a timestamp dynamically to a dataframe while running spark streaming application

  2. Hi Jack , You can use something like below

    1) If you are consuming from relational database and generating a DF named InsuranceBenefitsDF.

    case class Data(name:String ,date :String)

    val benefitRDD => {

    val col = x.toString().split(",")

    val name = col(0)

    val date = (new java.util.Date).toString

    val newDf = spark.createDataFrame(benefitRDD)


    // Use below if you need to add a new column with Current Date
    val currentDatetime = new java.util.Date

    val df1=empDf.withColumn("date",lit(currentDatetime.toString))

    Hope this helps ..
    Amit Dass

  3. how can I add a new column contain just the hours extracted from an existing timestamp field