Shark Jumps Big Data (at SIGMOD)

Reynold Xin

Given all the excitement around Big Data recently, it’s not surprising that some analysts and bloggers have begun claiming that the Big Data meme has “Jumped the Shark”.    In the AMPLab we believe this is far from being true and in fact, we have exactly the opposite perspective – namely, that there remains a need for new data analytics frameworks than can scale to massive data sizes.    For this reason, we have developed the Shark system for query processing on Big Data.

Shark (whose name comes from the combination of Spark and Hive) is a data warehouse system that is compatible with Apache Hive.  Building on AMPLab’s Spark system, Shark provides two chief advantages over Hive: performance gains through both smart query optimization and caching data in a cluster’s memory as well as integration with Spark for iterative machine learning.

We will be doing a demo of the Shark system on a 100-node EC2 cluster this week at the SIGMOD conference in Phoenix.  In the demo we will interactively mine 16 months of Wikipedia hourly traffic logs.  If you are attending the conference, please drop by and say “Hi!”  (Update 5/25/12: Shark Wins Best Demo Award at SIGMOD 2012)

Performance

Analytical queries usually focus on a particular subset or time window. For example, a query might run over HTTP logs from the previous month, touching only the (small) dimension tables and a small portion of the fact table. These queries exhibit strong temporal locality, and in many cases, it is plausible to fit the working set into a cluster’s memory. Shark allows users to exploit this temporal locality by caching their working set of data, or in database terms, to create in-memory materialized views. Common data types can be cached in a columnar format (as arrays of Java primitives), which has little storage overhead and very efficient for Java garbage collection, yet provides maximum performance (an order of magnitude faster than reading data from disk).

Additionally, Shark employs a number of optimization techniques such as limit push downs and hash-based shuffle, which can provide significant speedups in query processing.

Integration with Spark for Machine Learning

Consider the following program to run k-means clustering in Shark:

// k-means implemented in Spark
def kmeans(points: RDD[Point], k: Int) = {
  // Initialize the centroids.
  clusters = new HashMap[Int, Point]
  for (i <- 0 until k) centroids(i) = Point.random()
  for (i <- 1 until 10) {
    // Assign points to centroids and update centroids.
    clusters = points.groupBy(closestCentroid)
      .map{ (id, points) => (id, points.sum / points.size)
    }.collectMap()
  }
}

// Use SQL to select the “young” users
val youngUsers = sql2rdd("SELECT * FROM users WHERE age < 20")
println(youngUsers.count)
val featureMatrix = youngUsers.mapRows(extractFeatures)
kmeans(featureMatrix)

We allow users to exploit Spark’s inherent efficiency at iterative algorithms by providing a set of APIs that turns a SQL query into a RDD (a collection of records distributed in a cluster’s memory). Using this API, users can use SQL to explore data, while expressing more sophisticated machine learning algorithms in Spark. These machine learning algorithms run on the same set of distributed workers and can share the same distributed memory as the query processor. This enables much more efficient data pipelines and provides a unified system for data analysis using both SQL and sophisticated statistical learning functions.

Compatibility with Apache Hive

Shark achieves its compatibility with Hive by reusing Hive code as much as possible. It reuses Hive’s query language, metastore, serializers and deserializers, and user-defined function interfaces. It takes the logical query plan generated by the Hive parser and swaps out the execution engine. In Hive’s case, the execution engine uses Hadoop, so Shark instead generates its own physical plan consisting of operators written using Spark. This means that using Shark, you can run your existing Hive QL queries on your Hive warehouse without modifying the data, while enjoying the benefits of caching, better optimization and the Spark API for machine learning.

Again, we will be showcasing Shark at SIGMOD. Please check it out!