When it comes to Big Data processing in the cloud compared to on-premise, one of the fundamental differences between the two is how the data is stored and accessed. Not having a clear understanding of this underlying difference between, for example, AWS S3 in the cloud and HDFS on-prem leads to a suboptimal service, to say the least. At Qubole, we’ve made it part of our product development process to recognize these differences and have implemented significant performance enhancements in leading Big Data processing engines supported by Qubole Data Service (QDS) such as Hadoop, Hive, and Presto.
In this post, we will take a look at one such optimization we’ve implemented with regard to AWS S3 listings. This optimization enables split computations to run significantly faster (more on this below) on SparkSQL queries and is generally available in Apache Spark as a Service offered through QDS.
Split Computation Optimization
Performance Boost
We’ve noticed a significant performance boost as a result of this optimization especially when there are a large number of partitions in Hive tables. For instance, we executed a simple select query on a table with 5,462 partitions–each containing one file. Here are the results:
Without optimization (in secs) | With optimization (in secs) | Improvement | |
Query execution | 782 | 120 | 6.5x |
S3 listing | 650 | 8 | 81x |
Looking at the numbers above, the performance gain due to the optimization is pretty clear.
Enabling it in QDS
In Spark versions 1.5.1 and above, you can enable this optimization by adding the following to the Spark application’s command line
–conf spark.sql.qubole.split.computation=true
Once enabled, QDS will leverage the bulk S3 listing optimization to perform faster-split computation on SparkSQL queries. This benefits partitioned Hive tables across all formats that depend on FileInputFormat. It does not affect unpartitioned tables and Parquet/ORC tables because they have their own way of listing S3 files.
How Does It Work
Spark’s TableReader creates one HadoopRDD per input partition. When this occurs, QDS adds the partition directory to a list. Later, the DAGScheduler calls getPartitions on each HadoopRDD as part of discovering dependencies and when getPartitions is called on the first HadoopRDD, QDS issues a bulk S3 listing and populates FileStatus cache. For all subsequent calls to getPartitions, QDS will use this cache, thereby speeding up the split computation.
Stay Tuned
In the future, we will:
- Post a more in-depth follow-up blog describing the technical details of our implementation
- Extend this optimization to Parquet and ORC tables
- Improve performance of partition recovery in SparkSQL
- Improve performance of INSERT-OVERWRITE queries