Friday 17 July 2015

Error: Spark 1.4 Read Array Struct data from Parquet

When there are complex structures in your parquet hive table, e.g. Array, Map, etc.
You will find the below error to read the parquet table by Spark 1.4, or the array column returns NULL.

val data = sqlContext.parquetFile("/apps/hive/warehouse/orcfile")

data.take(10)

15/07/10 15:50:32 INFO scheduler.DAGScheduler: Job 0 failed: take at <console>:32, took 0.582019 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://000016_0
        at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
        at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at java.util.ArrayList.elementData(ArrayList.java:400)
        at java.util.ArrayList.get(ArrayList.java:413)


This is actually a known Parquet compatibility issue, which affects all Spark versions. Should be fixed in Spark 1.5.
Reason: Before, Spark SQL is only compatible with parquet-avro, parquet-hive, and Impala. And it's done in an error prone ad-hoc way, because Parquet format spec didn't explicitly specify complex type structures at the time Spark SQL Parquet support was firstly authored. 

The quick solution is: Use ORC to replace Parquet.
For example,


import org.apache.spark.sql._

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

sqlContext.setConf(“spark.sql.orc.filterPushdown”, “true”)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val data = hiveContext.read.format("orc").load("/apps/hive/warehouse/orcfile")
 
Spark’s ORC data source supports complex data types (i.e., array, map, and struct), and provides read and write access to ORC files. It leverages Spark SQL’s Catalyst engine to do common optimizations, such as column pruning, predicate push-down, and partition pruning, etc.
Note: By default, ORC predicate push-down is disabled in the Spark SQL and need to be explicitly enabled.

Currently, the code for Spark SQL ORC support is under package org.apache.spark.sql.hive and must be used together with Spark SQL’s HiveContext. This is because ORC is still tightly coupled with Hive for now.
Spark’s ORC support requires only a HiveContext instance. HiveContext is an instance of the Spark SQL execution engine that integrates with data stored in Hive. The more basic SQLContext provides a subset of the Spark SQL support that does not depend on Hive.

Spark supports saving data out in a partitioned layout seamlessly, through the partitionBy method available during data source writes. 

person.write.format("orc").partitionBy(“age”).save("peoplePartitioned")

2. Filed name rules in parquet.

Remove " ,;{}()\n\t=" from column names for parquet support.

Attribute name "Client Id" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.        ;
org.apache.spark.sql.AnalysisException: Attribute name "EXTRACT NAME" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.; at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$.analysisRequire(CatalystSchemaConverter.scala:547) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$.checkFieldName(CatalystSchemaConverter.scala:533) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$$anonfun$convertToString$2.apply(ParquetTypesConverter.scala:69)



Reference:
http://hortonworks.com/blog/bringing-orc-support-into-apache-spark/
https://databricks.com/blog/2015/07/16/joint-blog-post-bringing-orc-support-into-apache-spark.html

No comments:

Post a Comment