Notice the data is sorted by the AccountId however for a given account the data is not sorted by SequenceId. One important thing to remember is Hadoop does not preserve the order of the data when passing it to the nodes which will run the reduce step. The map step will group the data by the specified key but even if the data is sorted on the server correctly, it is not guaranteed that order will be preserved in the reduce. If your analysis requires the data to be sorted not just grouped by key this must be handled in the reduce step. Now let suppose you need to know the ordered sequence of products purchased for a given account. For clarity I numbered the product in accordance with its sequenceId. While this is easy to achieve with the python base library thankfully there is a library which has a number of function that makes this quite elegant to code, itertools. First let’s create a simple map step, mapper.py.
for line in sys.stdin:
# Split line by deliminator
aline = line.split(',')
# Choose the first part as map key
# Output map key followed by a tab and the rest of the data
print '%s\t%s' % (key, line.strip())
This will define the AccountId as the key. Remember even if you included the SequenceId in the key it will not help you. The Hadoop server will then just send each record randomly across all the nodes. If you need to do an analysis by account a given node will not see all the account’s transactions.
Now, here is the reduce step, reducer.py:
from itertools import groupby
from operator import itemgetter
First, we define how to parse the input data stream. Notice how simple it is to handle to different delimitations, the tab to define the key for the Hadoop server and the comma to separate out the data fields.
def read_mapper_output(file ,sortBy , sepA, sepB) :
for line in file:
key1,line1 = line.rstrip().split(sepA, 1)
aline = line.split(sepB)
# Create a second key to use on our data structure
key2 = aline[sortBy]
yield key1 , key2, line1
Yield defines this function as an iterator. And a structure was created with account number, sequence ID, and the entire data line.
def main(separator='\t' ):
# Define the iterator
data = read_mapper_output(sys.stdin, 1, '\t' , ',' )
# Loop through the iterator grouped by the first key.
for grpKey, grpDt in groupby(data, itemgetter(0)):
#set the product list blank
#Sort the grouped data by the second key.
for key1, key2, line in sorted(grpDt , key=itemgetter(1)):
# Populate you product list
products = products + ',' + aline
print grpKey + products
Finally, we call out the main function main as the default.
if __name__ == "__main__":
Run the following to test the code:
cat test.data | python mapper.py | python reducer.py
Even complex problems have an elegant and fast solution with Python.