Click Here for the previous version of the benchmark
Several analytic frameworks have been announced in the last year. Among them are inexpensive data-warehousing solutions based on traditional Massively Parallel Processor (MPP) architectures (Redshift), systems which impose MPP-like execution engines on top of Hadoop (Impala, HAWQ) and systems which optimize MapReduce to improve performance on analytical workloads (Shark, Stinger/Tez). This benchmark provides quantitative and qualitative comparisons of five systems. It is entirely hosted on EC2 and can be reproduced directly from your computer.
This remains a work in progress and will evolve to include additional frameworks and new capabilities. We welcome contributions.
This benchmark is not intended to provide a comprehensive overview of the tested platforms. We are aware that by choosing default configurations we have excluded many optimizations. The choice of a simple storage format, compressed SequenceFile, omits optimizations included in columnar formats such as ORCFile and Parquet. For now, we've targeted a simple comparison between these systems with the goal that the results are understandable and reproducible.
This benchmark measures response time on a handful of relational queries: scans, aggregations, joins, and UDF's, across different data sizes. Keep in mind that these systems have very different sets of capabilities. MapReduce-like systems (Shark/Hive) target flexible and large-scale computation, supporting complex User Defined Functions (UDF's), tolerating failures, and scaling to thousands of nodes. Traditional MPP databases are strictly SQL compliant and heavily optimized for relational queries. The workload here is simply one set of queries that most of these systems these can complete.
Our dataset and queries are inspired by the benchmark contained in a comparison of approaches to large scale analytics. The input data set consists of a set of unstructured HTML documents and two SQL tables which contain summary information. It was generated using Intel's Hadoop benchmark tools and data sampled from the Common Crawl document corpus. There are three datasets with the following schemas:
|Unstructured HTML documents||Lists websites and their page rank||Stores server logs for each web page|
Query 3 is a join query with a small result set, but varying sizes of joins.
|Framework||Instance Type||Memory||Storage||Virtual Cores||$/hour|
|Impala, Hive, Tez, Shark||m2.4xlarge||68.4 GB||1680GB (2HDD)||8||1.640|
|Redshift||dw.hs1.xlarge||15 GB||2 TB (3HDD)||2||.85|
|Framework||Instance Type||Instances||Memory||Storage||Virtual Cores||Cluster $/hour|
|Impala, Hive, Tez, Shark||m2.4xlarge||5||342 GB||8.4 TB (10HDD)||40||$8.20|
|Redshift||dw.hs1.xlarge||10||150 GB||20 TB (30HDD)||20||$8.50|
We launch EC2 clusters and run each query several times. We report the median response time here. Except for Redshift, all data is stored on HDFS in compressed SequenceFile format. Each query is run with seven frameworks:
|Redshift||Amazon Redshift with default options.|
|Shark - disk||Input and output tables are on-disk compressed with gzip. OS buffer cache is cleared before each run.|
|Impala - disk||Input and output tables are on-disk compressed with snappy. OS buffer cache is cleared before each run.|
|Shark - mem||Input tables are stored in Spark cache. Output tables are stored in Spark cache.|
|Impala - mem||Input tables are coerced into the OS buffer cache. Output tables are on disk (Impala has no notion of a cached table).|
|Hive||Hive on HDP 2.0.6 with default options. Input and output tables are on disk compressed with snappy. OS buffer cache is cleared before each run.|
|Tez||Tez with the configuration parameters specified here. Input and output tables are on disk compressed with snappy. OS buffer cache is cleared before each run.|
SELECT pageURL, pageRank FROM rankings WHERE pageRank > X
This query scans and filters the dataset and stores the results.
This query primarily tests the throughput with which each framework can read and write table data. The best performers are Impala (mem) and Shark (mem) which see excellent throughput by avoiding disk. For on-disk data, Redshift sees the best throughput for two reasons. First, the Redshift clusters have more disks and second, Redshift uses columnar compression which allows it to bypass a field which is not used in the query. Shark and Impala scan at HDFS throughput with fewer disks.
Both Shark and Impala outperform Hive by 3-4X due in part to more efficient task launching and scheduling. As the result sets get larger, Impala becomes bottlenecked on the ability to persist the results back to disk. Nonetheless, since the last iteration of the benchmark Impala has improved its performance in materializing these large result-sets to disk.
Tez sees about a 40% improvement over Hive in these queries. This is in part due to the container pre-warming and reuse, which cuts down on JVM initialization time.
SELECT SUBSTR(sourceIP, 1, X), SUM(adRevenue) FROM uservisits GROUP BY SUBSTR(sourceIP, 1, X)
This query applies string parsing to each input tuple then performs a high-cardinality aggregation.
Redshift's columnar storage provides greater benefit than in Query 1 since several columns of the
UserVistits table are un-used. While Shark's in-memory tables are also columnar, it is bottlenecked here on the speed at which it evaluates the
SUBSTR expression. Since Impala is reading from the OS buffer cache, it must read and decompress entire rows. Unlike Shark, however, Impala evaluates this expression using very efficient compiled code. These two factors offset each other and Impala and Shark achieve roughly the same raw throughput for in memory tables. For larger result sets, Impala again sees high latency due to the speed of materializing output tables.
SELECT sourceIP, totalRevenue, avgPageRank FROM (SELECT sourceIP, AVG(pageRank) as avgPageRank, SUM(adRevenue) as totalRevenue FROM Rankings AS R, UserVisits AS UV WHERE R.pageURL = UV.destURL AND UV.visitDate BETWEEN Date(`1980-01-01') AND Date(`X') GROUP BY UV.sourceIP) ORDER BY totalRevenue DESC LIMIT 1
This query joins a smaller table to a larger table then sorts the results.
When the join is small (3A), all frameworks spend the majority of time scanning the large table and performing date comparisons. For larger joins, the initial scan becomes a less significant fraction of overall response time. For this reason the gap between in-memory and on-disk representations diminishes in query 3C. All frameworks perform partitioned joins to answer this query. CPU (due to hashing join keys) and network IO (due to shuffling data) are the primary bottlenecks. Redshift has an edge in this case because the overall network capacity in the cluster is higher.
CREATE TABLE url_counts_partial AS SELECT TRANSFORM (line) USING "python /root/url_count.py" as (sourcePage, destPage, cnt) FROM documents; CREATE TABLE url_counts_total AS SELECT SUM(cnt) AS totalCount, destPage FROM url_counts_partial GROUP BY destPage;
|Query 4 (phase 1)||Query 4 (phase 2)||Query 4 (total)|
This query calls an external Python function which extracts and aggregates URL information from a web crawl dataset. It then aggregates a total count per URL.
Impala and Redshift do not currently support calling this type of UDF, so they are omitted from the result set. Impala UDFs must be written in Java or C++, where as this script is written in Python. The performance advantage of Shark (disk) over Hive in this query is less pronounced than in 1, 2, or 3 because the shuffle and reduce phases take a relatively small amount of time (this query only shuffles a small amount of data) so the task-launch overhead of Hive is less pronounced. Also note that when the data is in-memory, Shark is bottlenecked by the speed at which it can pipe tuples to the Python process rather than memory throughput. This makes the speedup relative to disk around 5X (rather than 10X or more seen in other queries).
These numbers compare performance on SQL workloads, but raw performance is just one of many important attributes of an analytic framework. The reason why systems like Hive, Impala, and Shark are used is because they offer a high degree of flexibility, both in terms of the underlying format of the data and the type of computation employed. Below we summarize a few qualitative points of comparison:
|System||SQL variant||Execution engine||UDF Support||Mid-query fault tolerance||Open source||Commercial support||HDFS Compatible|
|Hive||Hive QL (HQL)||MapReduce||Yes||Yes||Yes||Yes||Yes|
|Tez||Hive QL (HQL)||Tez||Yes||Yes||Yes||Yes||Yes|
|Shark||Hive QL (HQL)||Spark||Yes||Yes||Yes||Yes||Yes|
|Impala||Some HQL + some extensions||DBMS||Yes (Java/C++)||No||Yes||Yes||Yes|
|Redshift||Full SQL 92 (?)||DBMS||No||No||No||Yes||No|
We would like to include the columnar storage formats for Hadoop-based systems, such as Parquet and RC file. We would also like to run the suite at higher scale factors, using different types of nodes, and/or inducing failures during execution. Finally, we plan to re-evaluate on a regular basis as new versions are released.
We wanted to begin with a relatively well known workload, so we chose a variant of the Pavlo benchmark. This benchmark is heavily influenced by relational queries (SQL) and leaves out other types of analytics, such as machine learning and graph processing. The largest table also has fewer columns than in many modern RDBMS warehouses. In future iterations of this benchmark, we may extend the workload to address these gaps.
This benchmark is not an attempt to exactly recreate the environment of the Pavlo at al. benchmark. Instead, it draws on that benchmark for inspiration in the dataset and workload. The most notable differences are as follows:
We've started with a small number of EC2-hosted query engines because our primary goal is producing verifiable results. Over time we'd like to grow the set of frameworks. We actively welcome contributions!
We've tried to cover a set of fundamental operations in this benchmark, but of course, it may not correspond to your own workload. The prepare scripts provided with this benchmark will load sample data sets into each framework. From there, you are welcome to run your own types of queries against these tables. Because these are all easy to launch on EC2, you can also load your own datasets.
For now, no. The idea is to test "out of the box" performance on these queries even if you haven't done a bunch of up-front work at the loading stage to optimize for specific access patterns. For this reason we have opted to use simple storage formats across Hive, Impala and Shark benchmarking.
That being said, it is important to note that the various platforms optimize different use cases. As it stands, only Redshift can take advantage of its columnar compression. However, the other platforms could see improved performance by utilizing a columnar storage format. Specifically, Impala is likely to benefit from the usage of the Parquet columnar file format.
We may relax these requirements in the future.
We did, but the results were very hard to stabilize. The reason is that it is hard to coerce the entire input into the buffer cache because of the way Hive uses HDFS: Each file in HDFS has three replicas and Hive's underlying scheduler may choose to launch a task at any replica on a given run. As a result, you would need 3X the amount of buffer cache (which exceeds the capacity in these clusters) and or need to have precise control over which node runs a given task (which is not offered by the MapReduce scheduler).
We plan to run this benchmark regularly and may introduce additional workloads over time. We welcome the addition of new frameworks as well. The only requirement is that running the benchmark be reproducible and verifiable in similar fashion to those already included. The best place to start is by contacting Patrick Wendell from the U.C. Berkeley AMPLab.
Since Redshift, Shark, Hive, and Impala all provide tools to easily provision a cluster on EC2, this benchmark can be easily replicated.
To allow this benchmark to be easily reproduced, we've prepared various sizes of the input dataset in S3. The scale factor is defined such that each node in a cluster of the given size will hold ~25GB of the
UserVisits table, ~1GB of the
Rankings table, and ~30GB of the web crawl, uncompressed. The datasets are encoded in
SequenceFile format along with corresponding compressed versions. They are available publicly at
|S3 Suffix||Scale Factor||
|/1node/||1||18 Million||1.28GB||155 Million||25.4GB||29.0GB|
|/5nodes/||5||90 Million||6.38GB||775 Million||126.8GB||136.9GB|
$> ec2/spark-ec2 -s 5 -k [KEY PAIR NAME] -i [IDENTITY FILE] --hadoop-major-version=2 -t "m2.4xlarge" launch [CLUSTER NAME]
NOTE: You must set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables.
Change: ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/mesos/spark-ec2.git -b v2") To: ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/ahirreddy/spark-ec2.git -b ext4-update")
Run the following commands on each node provisioned by the Cloudera Manager. These commands must be issued after an instance is provisioned but before services are installed.
dev=/dev/xvdb sudo umount $dev sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 $dev sudo mount -o defaults,noatime,nodiratime $dev dev=/dev/xvdc sudo mkdir /data0 sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 $dev sudo mount -o defaults,noatime,nodiratime $dev sudo mount -t ext4 -o defaults,noatime,nodiratime $dev /data0
By default our HDP launch scripts will format the underlying filesystem as Ext4, no additional steps are required.
This command will launch and configure the specified number of slaves in addition to a Master and an Ambari host.
$> AWS_ACCESS_KEY_ID=[AWS ID] AWS_SECRET_ACCESS_KEY=[AWS SECRET] ./prepare-hdp.sh --slaves=N --key-pair=[INSTANCE KEYPAIR] --identity-file=[SSH PRIVATE KEY] --instance-type=[INSTANCE TYPE] launch [CLUSTER NAME]
Once complete, it will report both the internal and external hostnames of each node.
To install Tez on this cluster, use the following command. It will remove the ability to use normal Hive.
$> ./prepare-benchmark.sh --hive-tez --hive-host [MASTER REPORTED BY SETUP SCRIPT] --hive-identity-file [SSH PRIVATE KEY]
Scripts for preparing data are included in the benchmark github repo. Use the provided
prepare-benchmark.sh to load an appropriately sized dataset into the cluster.
Here are a few examples showing the options used in this benchmark