Sorting in Hadoop with Itertools

Previously, I gave a simple example of a python map/reduce program. In this example I will show you how to sort the data within a key using itertools, a powerful python library focusing on looping algorithms. This example is an extension of Michael Noll’s detailed Hadoop example. The data in this example will be comma-delimited.

AccountId, SequenceId,Amount,Product
0123,9,12.50,Product9
0123,5, 4.60,Product5
0123,7,54.00,Product7
0123,2,34.75,Product2
0123,3, 6.34,Product3
0123,8,14.50,Product8
0123,1,52.56,Product1
0123,4,78.45,Product4
0123,6,89.50,Product6
0321,1,2.12,Product1
0321,8,90.50,Product8
0321,3, 2.35,Product3
0321,4,56.25,Product4
0321,9,71.00,Product9
0321,2,24.75,Product2
0321,7,34.34,Product7
0321,6,34.23,Product6
0321,5,37.03,Product5       

Notice the data is sorted by the AccountId however for a given account the data is not sorted by SequenceId. One important thing to remember is Hadoop does not preserve the order of the data when passing it to the nodes which will run the reduce step. The map step will group the data by the specified key but even if the data is sorted on the server correctly, it is not guaranteed that order will be preserved in the reduce. If your analysis requires the data to be sorted not just grouped by key this must be handled in the reduce step. Now let suppose you need to know the ordered sequence of products purchased for a given account. For clarity I numbered the product in accordance with its sequenceId. While this is easy to achieve with the python base library thankfully there is a library which has a number of function that makes this quite elegant to code, itertools. First let’s create a simple map step, mapper.py.

#!/usr/bin/env python 
import sys 
for line in sys.stdin: 
    # Split line by deliminator
    aline = line.split(',') 
    # Choose the first part as map key
    key= aline[0] 
    # Output map key followed by a tab and the rest of the data
    print '%s\t%s' % (key, line.strip()) 

This will define the AccountId as the key. Remember even if you included the SequenceId in the key it will not help you. The Hadoop server will then just send each record randomly across all the nodes. If you need to do an analysis by account a given node will not see all the account’s transactions.

Now, here is the reduce step, reducer.py:

#!/usr/bin/env python
import sys
from itertools import groupby
from operator  import itemgetter 

First, we define how to parse the input data stream. Notice how simple it is to handle to different delimitations, the tab to define the key for the Hadoop server and the comma to separate out the data fields.

def read_mapper_output(file ,sortBy ,  sepA,  sepB) :
    for line in file: 
        key1,line1 = line.rstrip().split(sepA, 1)
        aline      = line.split(sepB)
        # Create a second key to use on our data structure
        key2       = aline[sortBy]
        yield key1 , key2,  line1 

Yield defines this function as an iterator. And a structure was created with account number, sequence ID, and the entire data line.

def main(separator='\t'  ): 
    # Define the iterator
    data   = read_mapper_output(sys.stdin, 1, '\t'  , ',' ) 
    # Loop through the iterator grouped by the first key.
    for grpKey, grpDt in groupby(data, itemgetter(0)): 
        #set the product list blank
        products=''
        #Sort the grouped data by the second key.
        for key1, key2, line in sorted(grpDt , key=itemgetter(1)):  
             aline =line.split(',')
             # Populate you product list
             products = products + ',' +   aline[3]     
             print grpKey + products 

Finally, we call out the main function main as the default.

 
if __name__ == "__main__":
    main() 

Run the following to test the code:

cat test.data | python mapper.py | python reducer.py

It will output:

0123,Product1,Product2,Product3,Product4,Product5,Product6,Product7,Product8,Product9
0321,Product1,Product2,Product3,Product4,Product5,Product6,Product7,Product8,Product9

Even complex problems have an elegant and fast solution with Python.