Previously, I gave a simple example of a python map/reduce program. In this example I will show you how to sort the data within a key using itertools, a powerful python library focusing on looping algorithms. This example is an extension of Michael Noll’s detailed Hadoop example. The data in this example will be comma-delimited.
AccountId, SequenceId,Amount,Product 0123,9,12.50,Product9 0123,5, 4.60,Product5 0123,7,54.00,Product7 0123,2,34.75,Product2 0123,3, 6.34,Product3 0123,8,14.50,Product8 0123,1,52.56,Product1 0123,4,78.45,Product4 0123,6,89.50,Product6 0321,1,2.12,Product1 0321,8,90.50,Product8 0321,3, 2.35,Product3 0321,4,56.25,Product4 0321,9,71.00,Product9 0321,2,24.75,Product2 0321,7,34.34,Product7 0321,6,34.23,Product6 0321,5,37.03,Product5
Notice the data is sorted by the AccountId however for a given account the data is not sorted by SequenceId. One important thing to remember is Hadoop does not preserve the order of the data when passing it to the nodes which will run the reduce step. The map step will group the data by the specified key but even if the data is sorted on the server correctly, it is not guaranteed that order will be preserved in the reduce. If your analysis requires the data to be sorted not just grouped by key this must be handled in the reduce step. Now let suppose you need to know the ordered sequence of products purchased for a given account. For clarity I numbered the product in accordance with its sequenceId. While this is easy to achieve with the python base library thankfully there is a library which has a number of function that makes this quite elegant to code, itertools. First let’s create a simple map step, mapper.py.
#!/usr/bin/env python import sys for line in sys.stdin: # Split line by deliminator aline = line.split(',') # Choose the first part as map key key= aline[0] # Output map key followed by a tab and the rest of the data print '%s\t%s' % (key, line.strip())
This will define the AccountId as the key. Remember even if you included the SequenceId in the key it will not help you. The Hadoop server will then just send each record randomly across all the nodes. If you need to do an analysis by account a given node will not see all the account’s transactions.
Now, here is the reduce step, reducer.py:
#!/usr/bin/env python import sys from itertools import groupby from operator import itemgetter
First, we define how to parse the input data stream. Notice how simple it is to handle to different delimitations, the tab to define the key for the Hadoop server and the comma to separate out the data fields.
def read_mapper_output(file ,sortBy , sepA, sepB) : for line in file: key1,line1 = line.rstrip().split(sepA, 1) aline = line.split(sepB) # Create a second key to use on our data structure key2 = aline[sortBy] yield key1 , key2, line1
Yield defines this function as an iterator. And a structure was created with account number, sequence ID, and the entire data line.
def main(separator='\t' ): # Define the iterator data = read_mapper_output(sys.stdin, 1, '\t' , ',' ) # Loop through the iterator grouped by the first key. for grpKey, grpDt in groupby(data, itemgetter(0)): #set the product list blank products='' #Sort the grouped data by the second key. for key1, key2, line in sorted(grpDt , key=itemgetter(1)): aline =line.split(',') # Populate you product list products = products + ',' + aline[3] print grpKey + products
Finally, we call out the main function main as the default.
if __name__ == "__main__": main()
Run the following to test the code:
cat test.data | python mapper.py | python reducer.py
It will output:
0123,Product1,Product2,Product3,Product4,Product5,Product6,Product7,Product8,Product9 0321,Product1,Product2,Product3,Product4,Product5,Product6,Product7,Product8,Product9
Even complex problems have an elegant and fast solution with Python.