How to process a million songs in seconds

Appuri is hiring data scientists, Java/Scala developers as well as front-end devs proficient with modern frameworks like AngularJS. Please drop me a line at bilal at appuri dot com if you are interesting in learning more.

PS: This excellent blog post inspired us to try Spark with the 1 Million Song dataset.

TL;DR

Spark with caching is really, really fast!

Why Spark?

At Appuri, we are building the next generation of game analytics and performance management. Games tend to have a large ‘data exhaust’ – millions of events generated by thousands of concurrent users tend to create a LOT of data. Typically, game companies log events but don’t do anything interesting or useful with them. Running queries on billions of rows is a non-trivial task for anyone but data scientists. That’s where Appuri comes in: we allow anyone at game companies to query terabytes of game data and act on it in seconds, not hours.

Spark, by UC Berkeley, is part of our big data stack, and in this post I’m going to introduce you to setting up Spark on Amazon EC2 and using it to run a lightning fast query on a million songs.

Hadoop distributes work across slave nodes, collects up the results and writes them out to a database or file. Spark works on top of your existing Hadoop infrastructure, but adds a couple of key enhancements:

Scala REPL. You write Hadoop jobs in Java. While there are abstractions on top of Hadoop like Hive and Pig, they don’t have the power and expressiveness of a true REPL. Spark gives you a full Scala REPL where you can explore data without the tedium of submitting jobs and waiting for them to finish.

Local caching using Resilient Distributed Datasets (RDD). The ‘a-ha’ moment of spark is a data structure called an RDD, which is a distributed cache. RDDs are stored in memory in between queries, so repeated or iterated queries are really fast. If you have ever done linear regression in Hadoop, Spark will be an order of magnitude faster.

The Data

For this blog post, we’re going to use the Million Song Dataset from Columbia University. The data is, unfortunately, in HDF5 format, which is not native to Hadoop or Spark. Spark makes it easy to parse TSVs or CSV. For the purpose of this blog post, we converted the data to TSV format and put it in an S3 bucket.

The schema of the data is documented here. There are preditable fields like artist_mbid, but the more interesting data is in fields like year (the year the song was released) and tempo.

The Challenge

The challenge is to explore the dataset and find interesting patterns in it. This sort of questioning is fundamentally possibly only with a REPL, because you can try out different filters interactively without the submit-wait-getCoffee-resubmit loop of Hadoop :)

Let’s answer a few questions:

  • How many songs were released each year?
  • How many songs were produced between 1970 and 1980?
  • How many rock songs were produced between 1970 and 1980 in New Jersey?

Setup

Sign up for EC2

If you don’t have an EC2 account, sign up now. You also need an SSH keypair – follow these instructions to get this set up.

For the rest of the tutorial, I’ll assume your EC2 key is called ec2dev.pem and is stored in ~/.ssh.

Spin up the cluster

Visit the Spark Downloads page and download the latest version to your computer.

tar -xvzf spark-0.7.3-sources.tgz
cd spark-0.7.3/ec2
./spark-ec2 -k ec2dev -i ~/.ssh/ec2dev.pem -s 10 launch hotpants

This key launches 10 slave nodes on a cluster named hotpants (use your own cluster name if you don’t like hotpants). Spark’s EC2 scripts are really nice, they even set up Ganglia monitoring. Time to go grab a cup of coffee. This will take a while!

Connect to the cluster

There are three ways to connect to and monitor your cluster.

Spark UI: Navigate to http://<master_dns>:8080, where master_dns is the DNS of your Spark master. Note that the deployment script prints out this value when it is finished. This UI shows a simple but useful view of each node in the cluster, including memory utilization and running jobs.

Ganglia UI. Navigate to http://master_dns:5080/ganglia to see low-level metrics like CPU, memory and I/O utilization for each node in the cluster.

SSH. Let’s SSH to the master node to start running queries:

ssh-add ~/.ssh/ec2-dev.pem
ssh [email protected]<master_dns>

Note that I use the ssh-add command to avoid having to provide the key to the ssh command.

Copy data to HDFS

Your Spark setup comes with HDFS (Hadoop Distributed File System). Let’s copy the dataset into HDFS. Before we do that, we need to set up Hadoop with access to our S3 bucket using the S3 Native file system. We could work off of S3 directly, but it’s faster to have the data available locally in the cluster:

vim /root/ephemeral-hdfs/conf/core-site.xml

Add this to the file and save:

<property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>AWS_ACCESS ID</value>
</property>
<property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>AWS_SECRET KEY</value>
</property>

We also need to start the Hadoop Job Tracker:

/root/ephemeral-hdfs/bin/start-mapred.sh

Now we’re ready to copy data into HDFS:

/root/ephemeral-hdfs/bin/hadoop distcp s3n://<bucket>/ /msd

Go grab your next cup of coffee! This copy will take roughly an hour to complete (it took 50 minutes on my cluster).

Exploring the REPL

Now that the dataset is copied locally, we can start exploring it from the Spark REPL. There’s a catch that we ran into when launching the REPL – by default, it launches in local mode, which means any work you do will only launch on the the REPL is running on (you’ll be left wondering what the fuss over Spark is all about!)

To prevent this from happening, just run Spark shell like this:

export MASTER=`cat /root/spark-ec2/cluster-url`

/root/spark/spark-shell

Once the Spark REPL launches, you will be sitting in a prompt starting with scala>. Let’s try out our queries:

First, let’s take 10 records from the dataset:

val songs = sc.textFile("hdfs://master_dns:9000/msd")
val tenSongs = songs.take(10)
…

13/07/24 17:11:32 INFO spark.SparkContext: Job finished: take at <console>:14, took 1.514120493 s
tenSongs: Array[String] = Array(TRAAAUC128F428716F  22050   16816   0.628898526808  0.405014695008  ARA23XO1187B9AF18F  40.57885    Carteret, New Jersey -74.21956    fae2cd4c-f847-4cb3-b311-60c8906cdb0b         The Smithereens 61479   pop rock,alternative rock,power pop,indie rock,rock,hard rock,soft rock,british pop,delta blues,chicago blues,blues-rock,rockabilly,chanson,new wave,hip hop,pop rap,folk rock,classic rock,stoner rock,dub,garage rock,psychedelic rock,funk,college rock,grunge,blues,indie pop,united states,reggae,soundtrack,singer-songwriter,electro,pop,electronic,female vocalist,rap,alternative,80s,synthpop,american,indie,punk,folk,soul,90s,lo-fi,acoustic,    0.980739660227,0.984358036619,0.898633143514,0.954793007772,1.0,0.907538328714,0.792850232535,0.747263005002,0.747263005002,0.747263005002,0.7...

As you can see see, this simply returns ten raw text rows from the dataset. Let’s turn each row into a Scala array of 54 elements:

val tenSongsParsed = tenSongs.map(row => row.split("\t"))
tenSongsParsed(0)
 res0: Array[java.lang.String] = Array(TRAAAUC128F428716F, 22050, 16816, 0.628898526808, 0.405014695008, ARA23XO1187B9AF18F, 40.57885, Carteret, New Jersey, -74.21956, fae2cd4c-f847-4cb3-b311-60c8906cdb0b, "", "", The Smithereens, 61479, pop rock,alternative rock,power pop,indie rock,rock,hard rock,soft rock,british pop,delta blues,chicago blues,blues-rock,rockabilly,chanson,new wave,hip hop,pop rap,folk rock,classic rock,stoner rock,dub,garage rock,psychedelic rock,funk,college rock,grunge,blues,indie pop,united states,reggae,soundtrack,singer-songwriter,electro,pop,electronic,female vocalist,rap,alternative,80s,synthpop,american,indie,punk,folk,soul,90s,lo-fi,acoustic,, 0.980739660227,0.984358036619,0.898633143514,0.954793007772,1.0,0.907538328714,0.792850232535,0.747263005002,0.747263...

Let’s count the total songs in our dataset:

songs.count
…
res2: Long = 991000

This full scan of 250 GB took 366 seconds to complete. That’s about what you would expect from a plain old Hadoop cluster of the same size, but remember we’re not utilizing any Spark caching yet.

Let’s count the number of songs by decade to further explore the power of the Spark REPL. Our map function emits a tuple of (year song was produce,1) and we use Spark’s reduceByKey action to reduce by year. The collect() action runs the task:

songs.map(row => { val parsedSong = row.split("\t"); (parsedSong(53), 1) }).reduceByKey((a,b) => a + b).collect()

res6: Array[(java.lang.String, Int)] = Array((0,484424), (1922,6), (1924,5), (1925,7), (1926,19), (1927,43), (1928,52), (1929,93), (1930,40), (1931,35), (1932,11), (1933,6), (1934,29), (1935,24), (1936,25), (1937,28), (1938,19), (1939,35), (1940,52), (1941,32), (1942,24), (1943,14), (1944,15), (1945,30), (1946,29), (1947,57), (1948,43), (1949,60), (1950,84), (1951,74), (1952,77), (1953,133), (1954,123), (1955,275), (1956,565), (1957,598), (1958,583), (1959,592), (1960,424), (1961,572), (1962,605), (1963,902), (1964,945), (1965,1120), (1966,1377), (1967,1718), (1968,1867), (1969,2211), (1970,2350), (1971,2131), (1972,2288), (1973,2596), (1974,2186), (1975,2482), (1976,2179), (1977,2502), (1978,2926), (1979,3108), (1980,3101), (1981,3167), (1982,3597), (1983,3386), (1984,3368), (1985,3578...

This task takes 2183 seconds to run. note that ~50% of songs are in year “0”, which means there’s no release date for them. I didn’t bother sorting the result array.

RDD Caching

These queries, while fun to write, did not demonstrate the power of Spark’s RDD caching. Let’s run a different query – first, let’s count the number of songs between two dates:

val songsCached = songs.filter(l => { val year = l.split("\t")(53).toInt; year >= 1970 && year < 1980 }).cache
songsCached.count
res7: Long = 24748

Note that we called the cache method at the very end. This instructs Spark to keep RDDs in memory between queries. This first query runs in 351 seconds. Let’s run another query:

songsCached.count
res15: Long = 24748

The repeat query runs in only 1.9 seconds! That’s the magic of working on a cached RDD.

Let’s check out what happens if you run a slightly different query:

songsCached.filter(l => { val year = l.split("\t")(53).toInt; year >= 1980 && year < 1990 }).count
res16: Long = 0

As expected, we found zero songs because we are querying over a dataset that includes only songs from 1970 to 1980.

Let’s do an iterative filter – we will now find the count of songs released between 1970 and 1980 in New Jersey:

songsCached.filter(l => { val split = l.split("\t"); val year = split(53).toInt; val location = split(7); year >= 1970 && year < 1980 && location.contains("New Jersey") }).count

res17: Long = 61

This query only took 7 seconds.

Optimizing Spark

There are many ways to optimize Spark. Start with your Scala code, making sure you are creating as few objects as possible. Next, Spark RDDs are very memory intensive – your cluster will perform a lot better if you use high-memory instances.

Terminating the cluster

When you’re done with your Spark cluster, run this command to tear it down. All your imported data will be lost!

./spark-ec2 -k ec2dev -i ~/.ssh/ec2dev.pem destroy hotpants

Final Thoughts

Spark is an exciting addition to the big data stack. There are a few additions that I am particularly excited about, including a job server that will make it easy to submit and track jobs.

If you want more Spark blog posts, let me know in the comments below!