Joseph Wilk

Joseph Wilk

Things with code, creativity and computation.

A Little Bit of Pig

Currently in the Science team at Songkick I’ve been working with Apache Pig to generate lots of interesting metrics for our business intelligence. We use Amazon’s MapReduce and Pig to avoid having to run complex, long running and intensive queries on our live db, we can run them on Amazon in a timely fashion instead. So lets dive into Pig and how we use it at Songkick.com.

Pig (whats with all these silly names)

The Apache project Pig is a data flow language designed for analysing large datasets. It provides a high-level platform for creating MapReduce programs used with Hadoop. A little bit like SQL but Pig’s programs by their structure are suitable for parallelization, which is why they are great at  handling very large data sets.

Heres how we use Pig and ElasticMapReduce at Songkick in our Science team.

Data (Pig food)

Lets start by uploading some huge and interesting data about Songkicks artists onto S3. We start by dumping a table from mysql (along with a lot of other tables) and then query that data with Pig on Hadoop. While we could extract all the artist data by querying the live table its actually faster to use mysqldump and dump the table as a TSV file.

For example it took 35 minutes to dump our artist table with a sql query ‘select * from artists’. It takes 10 minutes to dump the entire table with mysqldump.

We format the table dump as a TSV which we push to S3 as that makes it super easy to use Amazons ElasticMapReduce with Pig.

shell> mysqldump --user=joe --password  --fields-optionally-enclosed-by='"'
                  --fields-terminated-by='\t' --tab /tmp/path_to_dump/ songkick artist_trackings

Unfortunately this has to be run on the db machine since mysqldump needs access to the file system to save the data. If this is a problem for you there is a Ruby script for dumping tables to TSV: http://github.com/apeckham/mysqltsvdump/blob/master/mysqltsvdump.rb

Launching (Pig catapult)

We will be using Amazons Elastic MapReduce to run our Pig scripts. We can start our job in interactive Pig mode which allows us to ssh to the box and run the pig script line by line.

Examples (Dancing Pigs)

An important thing to note when running pig scripts interactively is that they defer execution until they have to expose a result. This means you can get nice schema checks and validations helping ensure your PIG script is valid without actually executing it over your large dataset.

We are going to try and calculate the average number of users tracking an artist based on the condition that we only count users who logged in, in the last 30 days.

This is what our Pig script is doing:

The Pig script:

1
2
3
4
5
6
7
8
-- Define some useful dates we will use later
%default TODAYS_DATE `date  +%Y/%m/%d`
%default 30_DAYS_AGO `date -d "$TODAYS_DATE - 30 day" +%Y-%m-%d`
    
-- Pig is smart enough when given a folder to go and find files, decompress them if necessarily and load them.
-- Note we have to specify the schema as PIG does not know know this from our TSV file.
trackings = LOAD 's3://songkick/db/trackings/$TODAYS_DATE/' AS (id:int, artist_id:int,  user_id:int); 
users = LOAD 's3://songkick/db/users/$TODAYS_DATE/' AS (id:int, username:chararray, last_logged_in_at:chararray);
trackings
<1, 1, 1>
<2, 1, 2>

users
<1,'josephwilk', '11/06/2012'>
<2,'elisehuard', '11/06/2012'>
<3,'tycho', '11/06/2010'>
1
2
3
-- Filter users to only those who logged in, in the last 30 days
    -- Pig does not understand dates, so just treat them as strings
    active_users = FILTER users by last_logged_in_at gte '$30_DAYS_AGO'
Users
<1,'josephwilk', '11/06/2012'>
<2,'elisehuard', '11/06/2012'>
1
2
3
4
active_users_and_trackings = JOIN active_users BY id, trackings BY user_id
    
    -- group all the users tracking an artists so we can count them.
    active_users_and_trackings_grouped = GROUP active_users_and_trackings BY active_users::user_id;
<1, 1, /\{<1,'josephwilk', '11/06/2012'>, <2,'elisehuard', '11/06/2012'>\/}>`
1
trackings_per_artist = FOREACH active_users_and_trackings_grouped GENERATE group, COUNT($2) as number_of_trackings;
`<\/{<1,'josephwilk', '11/06/2012'>, <2,'elisehuard', '11/06/2012'>\/}, 2>`
1
2
-- group all the counts so we can calculate the average
    all_trackings_per_artist = GROUP trackings_per_artist ALL;
<\/{\/{<1,'josephwilk', '11/06/2012'>, <2,'elisehuard', '11/06/2012'>\/}, 2\/}>
1
2
3
-- Calculate the average
    average_artist_trackings_per_active_user = FOREACH all_trackings_per_artist
      GENERATE '$DATE' as dt, AVG(trackings_per_artist.number_of_trackings);
<{<'11/062012', 2>}>
1
2
3
--Now we have done the work store the result in S3.
    STORE average_artist_trackings_per_active_user INTO
      's3://songkick/stats/average_artist_trackings_per_active_user/$TODAYS_DATE'

Debugging Pigs (Pig autopsy)

In an interactive pig session there are two useful commands for debugging: DESCRIBE to see the schema. ILLUSTRATE to see the schema with sample data:

DESCRIBE users;
users: {id:int, username:chararray, created_at:chararray, trackings:int}

ILLUSTRATE users;
----------------------------------------------------------------------
| users   | id: int | username:chararray | created_at | trackings:int |
----------------------------------------------------------------------
|         | 18      | Joe                | 10/10/13   | 1000          |
|         | 20      | Elise              | 10/10/14   | 2300          |
----------------------------------------------------------------------

Automating Elastic MapReduce (Pig robots)

Once you are happy with your script you’ll want to automate all of this. I currently do this by having a cron task which at regular intervals uses the elastic-mapreduce-ruby lib to fire up a elastic map reduce job and run it with the pig script to execute.

Its important to note that I store the pig scripts on S3 so its easy for elastic-mapreduce to find the scripts.

Follow the instructions to install elastic-mapreduce-ruby: https://github.com/tc/elastic-mapreduce-ruby

To avoid having to call elastic-mapreduce with 100s of arguments a colleague has written a little python wrapper to make it quick and easy to use: https://gist.github.com/2911006

You’ll need to configure where you’re elastic-mapreduce tool is installed AND where you want elastic map-reduce to log to on S3 (this means you can debug your elastic map reduce job if things go wrong!).

Now all we need to do is pass the script the path to the pig script on S3.

./emrjob s3://songkick/lib/stats/pig/average_artist_trackings_per_active_user.pig

Testing with PigUnit (Simulating Pigs)

Pig scripts can still take a long time to run even with all that Hadoop magic. Thankfully there is a testing framework PigUnit.

http://pig.apache.org/docs/r0.8.1/pigunit.html#Overview

Unfortunately this is where you have to step into writing Java. So I skipped it. Sshhh.

References

  1. Apache Pig official site: http://pig.apache.org

  2. Nearest Neighbours with Apache Pig and JRuby: http://thedatachef.blogspot.co.uk/2011/10/nearest-neighbors-with-apache-pig-and.html

  3. Helpers for messing with Elastic MapReduce in Ruby https://github.com/tc/elastic-mapreduce-ruby

  4. mysqltsvdump http://github.com/apeckham/mysqltsvdump/blob/master/mysqltsvdump.rb

Comments