Sunday 22 March 2015

DataFrame in Spark1.3

SchemaRDD has been renamed to DataFrame in Spark1.3.
A DataFrame is a distributed collection of data organized into named columns.
It is conceptually equivalent to a table in a relational database or a data frame in R/Python. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

Spark SQL supports two different methods for converting existing RDDs into DataFrames.

1. The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns.

case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

2. The programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct DataFrames when the columns and their types are not known until runtime.

  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD
  3. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
// Import Spark SQL data types and Row.
import org.apache.spark.sql._
// Generate the schema based on the string of schema
val schema = StructType(
    "name age".split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

2 comments:

  1. If I have 500 columns in my schema, do I have to repeat the column numbers 500 times, while doing the map ie p(0),p(1)...P(499) as below?
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    I tried options like map(p=>row(p)); map(p=>Row.fromSeq(p.toSeq));
    But they didn't work when creating the dataframe. Pl. let me know if there are any other alternatives, to dynamically specify the schema, regardless of the number of columns, when using the CreateDataFrame method, to create a DF from the RDD

    ReplyDelete