Or generate another data frame, then join with the original data frame.
1. Appending a new column from a UDF
The most connivence approach is to use withColumn(String, Column) method, which returns a new data frame by adding a new column.
It can only operate on the same data frame columns, rather than the column of another data frame.
For eample,
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
For a more complex case, we can apply a udf on the column.
import org.apache.spark.sql.functions._ val func = udf((col1 : String, col2 : String) => { val combine = col1 +" "+ col2 anotherFunc(combine) }) df.withColumn("newField", func(inputDF("colfield1"), inputDF("colfield2"))) //alternative df.select($"*", func($"colfield1",$"colfield2").as("newField"))
Register a UDF in Spark 1.5.2
sqlContext.udf.register("strLen", (s: String) => s.length())
2. Appending a new Column with constant value
When you need to append a constant value that is not related to existing columns of the dataframe.
import org.apache.spark.sql.functions._ val newDf = xmlDf.withColumn("newColumn", lit("newValue"))
3. Zip Two Columns with Array type
val pairTwo = udf((col1: Seq[Long], col2 : Seq[String]) => { col1 zip col2})
Note: We have to use Seq[T] instead of Array[T], since
WrappedArray
is not an Array
(which is plain old Java Array
not a natve Scala collection)Reference:
https://github.com/spirom/LearningSpark/blob/master/src/main/scala/sql/UDF.scala
http://www.scriptscoop.net/t/6038b4902459/scala-append-a-column-to-data-frame-in-apache-spark-1-3.html
http://stackoverflow.com/questions/34539068/how-do-i-convert-a-wrappedarray-column-in-spark-dataframe-to-strings
http://stackoverflow.com/questions/29406913/how-to-use-constant-value-in-udf-of-spark-sqldataframe
how to add a timestamp dynamically to a dataframe while running spark streaming application
ReplyDeleteHi Jack , You can use something like below
ReplyDelete1) If you are consuming from relational database and generating a DF named InsuranceBenefitsDF.
case class Data(name:String ,date :String)
val benefitRDD = InsuranceBenefitsDF.rdd.map(x=> {
val col = x.toString().split(",")
val name = col(0)
val date = (new java.util.Date).toString
Data
})
val newDf = spark.createDataFrame(benefitRDD)
newDf.createOrReplaceTempView("newTablewithDate")
// Use below if you need to add a new column with Current Date
val currentDatetime = new java.util.Date
val df1=empDf.withColumn("date",lit(currentDatetime.toString))
Hope this helps ..
Amit Dass
Blogger: https://bigdatajourney.blogspot.in/
how can I add a new column contain just the hours extracted from an existing timestamp field
ReplyDelete