Friday, 19 June 2015

SchemaRDD in SparkSQL


A SchemaRDD is an RDD of Row objects, each representing a record. A SchemaRDD also knows the schema (i.e., data fields) of its rows. While SchemaRDDs look like regular RDDs, internally they store data in a more efficient manner, taking advantage of their schema.

The recommended entry point is the HiveContext to provide access to HiveQL and other Hive-dependent functionality. The more basic SQLContext provides a subset of the Spark SQL support that does not depend on Hive.

Once we have constructed an instance of the HiveContext.
These implicits are used to convert RDDs with the required type information into Spark SQL’s specialized RDDs for querying.


// Create a Spark SQL HiveContext
val hiveCtx = new HiveContext(sc)
// Import the implicit conversions
import hiveCtx._

You can register any SchemaRDD as a temporary table to query it via HiveContext.sql or SQLContext.sql by registerTempTable() method. Temp tables are local to the HiveContext or SQLContext being used, and go away when your application exits.

Row objects represent records inside SchemaRDDs, and are simply fixed-length arrays of fields. In Scala/Java, Row objects have a number of getter functions to obtain the value of each field given its index.

Caching

Caching in Spark SQL works a bit differently. Since we know the types of each column, Spark is able to more efficiently store the data. To make sure that we cache using the memory efficient representation, rather than the full objects, we should use the special hiveCtx.cacheTable("tableName") method. When caching a table Spark SQL represents the data in an in-memory columnar format. This cached table will remain in memory only for the life of our driver program, so if it exits we will need to recache our data.

Spark SQL’s JDBC server corresponds to the HiveServer2 in Hive. It is also known as the “Thrift server” since it uses the Thrift communication protocol. Note that the JDBC server requires Spark be built with Hive support.
Performance Tuning

Spark SQL is able to use the knowledge of types to more efficiently represent our data. When caching data, Spark SQL uses an in-memory columnar storage. This not only takes up less space when cached, but if our subsequent queries depend only on subsets of the data, Spark SQL minimizes the data read.

Predicate push-down allows Spark SQL to move some parts of our query “down” to the engine we are querying. If we wanted to read only certain records in Spark, the standard way to handle this would be to read in the entire dataset and then execute a filter on it. However, in Spark SQL, if the underlying data store supports retrieving only subsets of the key range, or another restriction, Spark SQL is able to push the restrictions in our query down to the data store, resulting in potentially much less data being read.



A few options warrant special attention.

First is spark.sql.codegen, which causes Spark SQL to compile each query to Java bytecode before running it. Codegen can make long queries or frequently repeated queries substantially faster, because it generates specialized code to run them. However, in a setting with very short (1–2 seconds) ad hoc queries, it may add overhead as it has to run a compiler for each query.13 Codegen is also still experimental, but we recommend trying it for any workload with large queries, or with the same query repeated over and over. Note that the first few runs of codegen will be especially slow as it needs to initialize its compiler, so you should run four to five queries before measuring its overhead.

The second option you may need to tune is
spark.sql.inMemoryColumnarStorage.batchSize. When caching SchemaRDDs, Spark SQL groups together the records in the RDD in batches of the size given by this option (default: 1000), and compresses each batch. Very small batch sizes lead to low compression, but on the other hand, very large sizes can also be problematic, as each batch might be too large to build up in memory. If the rows in your tables are large (i.e., contain hundreds of fields or contain string fields that can be very long, such as web pages), you may need to lower the batch size to avoid out-of-memory errors. If not, the default batch size is likely fine, as there are diminishing returns for extra compression when you go beyond 1,000 records.

1 comment:

  1. i am using spark thrift, how to get better performance with the thrift. I am caching the whole table into the memory, still does not look like my queries are hitting that cached table.
    Please email your suggestion to my email.
    babloo_chauhan@hotmail.com

    ReplyDelete