Wednesday 18 November 2015

Append Spark Dataframe with a new Column by UDF

To change the schema of a data frame, we can operate on its RDD, then apply a new schema.
Or generate another data frame, then join with the original data frame.

1. Appending a new column from a UDF
The most connivence approach is to use withColumn(String, Column) method, which returns a new data frame by adding a new column.
It can only operate on the same data frame columns, rather than the column of another data frame.
For eample,

val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL

For a more complex case, we can apply a udf on the column.

import org.apache.spark.sql.functions._

val func = udf((col1 : String, col2 : String) =>
                { val combine = col1 +" "+ col2
                  anotherFunc(combine)
                })

df.withColumn("newField", func(inputDF("colfield1"), inputDF("colfield2")))

//alternative
df.select($"*", func($"colfield1",$"colfield2").as("newField"))

Register a UDF in Spark 1.5.2
sqlContext.udf.register("strLen", (s: String) => s.length())

2. Appending a new Column with constant value

When you need to append a constant value that is not related to existing columns of the dataframe.


import org.apache.spark.sql.functions._
val newDf = xmlDf.withColumn("newColumn", lit("newValue"))

3. Zip Two Columns with Array type

val pairTwo = udf((col1: Seq[Long], col2 : Seq[String]) =>
{ col1 zip col2})

Note: We have to use Seq[T] instead of Array[T], since
WrappedArray is not an Array (which is plain old Java Array not a natve Scala collection)


Reference:
https://github.com/spirom/LearningSpark/blob/master/src/main/scala/sql/UDF.scala
http://www.scriptscoop.net/t/6038b4902459/scala-append-a-column-to-data-frame-in-apache-spark-1-3.html
http://stackoverflow.com/questions/34539068/how-do-i-convert-a-wrappedarray-column-in-spark-dataframe-to-strings
http://stackoverflow.com/questions/29406913/how-to-use-constant-value-in-udf-of-spark-sqldataframe

3 comments:

  1. how to add a timestamp dynamically to a dataframe while running spark streaming application

    ReplyDelete
  2. Hi Jack , You can use something like below

    1) If you are consuming from relational database and generating a DF named InsuranceBenefitsDF.

    case class Data(name:String ,date :String)

    val benefitRDD = InsuranceBenefitsDF.rdd.map(x=> {

    val col = x.toString().split(",")

    val name = col(0)

    val date = (new java.util.Date).toString
    Data
    })

    val newDf = spark.createDataFrame(benefitRDD)

    newDf.createOrReplaceTempView("newTablewithDate")

    // Use below if you need to add a new column with Current Date
    val currentDatetime = new java.util.Date

    val df1=empDf.withColumn("date",lit(currentDatetime.toString))

    Hope this helps ..
    Amit Dass
    Blogger: https://bigdatajourney.blogspot.in/

    ReplyDelete
  3. how can I add a new column contain just the hours extracted from an existing timestamp field

    ReplyDelete